BSOne.SFC/eCrmHE/LIB_Common/CrmSocketBase.pas

619 lines
16 KiB
Plaintext

{*******************************************************}
{ }
{ CrmSocketBase }
{ }
{ Copyright (C) 2022 kku }
{ }
{*******************************************************}
unit CrmSocketBase;
interface
uses
Tocsg.Thread, System.Classes, System.SysUtils, IdScheduler, IdGlobal,
IdThread, IdSchedulerOfThread, IdYarn, IdTask, IdTCPServer,
IdCustomTCPServer, IdContext, IdComponent, IdExceptionCore,
IdResourceStringsCore, IdException, IdTCPClient, Winapi.Windows,
Winapi.Messages;
type
// TIdTCPServer.Active := false 시 TIdScheduler.TerminateAllYarns() 에서 행이 걸리는 문제가 있다..
// 인디 버그 인거 같은데 해결 방법이 없어서 이렇게 재구현 하는걸로 처리함...
// TerminateAllYarns() 만 깔끔하게
TTgSvrScheduler = class(TIdScheduler)
protected
FMaxThreads: Integer;
FThreadPriority: TIdThreadPriority;
FThreadClass: TIdThreadWithTaskClass;
procedure InitComponent; override;
public
destructor Destroy; override;
function NewThread: TIdThreadWithTask; virtual;
function NewYarn(AThread: TIdThreadWithTask = nil): TIdYarnOfThread;
procedure StartYarn(AYarn: TIdYarn; ATask: TIdTask); override;
procedure TerminateYarn(AYarn: TIdYarn); override;
function AcquireYarn: TIdYarn; override;
procedure ReleaseYarn(AYarn: TIdYarn); override;
procedure TerminateAllYarns; override;
property ThreadClass: TIdThreadWithTaskClass read FThreadClass write FThreadClass;
published
property MaxThreads: Integer read FMaxThreads write FMaxThreads;
property ThreadPriority: TIdThreadPriority read FThreadPriority write FThreadPriority default tpNormal;
end;
TCrmServerBase = class(TIdTCPServer)
private
dtActive_: TDateTime;
nPort_: Integer;
protected
fnConnect_,
fnDisconnect_: TIdServerThreadEvent;
procedure SetPort(nVal: Integer);
procedure _Trace(const str: String); overload;
procedure _Trace(const sFormat: String; const Args: array of const); overload;
procedure _EventExecute(AContext: TIdContext); virtual;
procedure _EventConnect(AContext: TIdContext); virtual;
procedure _EventGetClientInfo(AContext: TIdContext); virtual;
procedure _EventDisconnect(AContext: TIdContext); virtual;
procedure _EventException(AContext: TIdContext; AException: Exception);
procedure _EventStatus(ASender: TObject; const AStatus: TIdStatus;
const AStatusText: string);
procedure _ProcessRcv(AContext: TIdContext; sRcv: String); virtual; abstract;
// procedure TimerChop(Sender: TObject);
procedure SetActive(AValue: Boolean); override;
public
Constructor Create(nActivePort: Integer = 0);
Destructor Destroy; override;
// procedure SendPacket(AContext: TIdContext; sSend: String); overload;
// procedure SendPacket(AContext: TIdContext; nCmd: Integer); overload;
property Port: Integer read nPort_ write SetPort;
published
property OnClientConnect: TIdServerThreadEvent write fnConnect_;
property OnClientDisconnect: TIdServerThreadEvent write fnDisconnect_;
property ActiveDateTime: TDateTime read dtActive_;
end;
TThdCrmVwClientBase = class(TTgThread)
protected
sHost_: String;
nPort_: Integer;
Client_: TIdTCPClient;
bConnected_: Boolean;
function GetConnected: Boolean;
procedure SetConnected(bVal: Boolean);
function _ProcessRcv: Boolean;
// function _ProcessSend(Send: ISendPacket): Boolean; virtual;
procedure ProcessRcv(sRcv: String); virtual; abstract;
procedure Execute; override;
public
Constructor Create;
Destructor Destroy; override;
function Connect(const sHost: String; nPort: Integer): Boolean;
end;
implementation
uses
Tocsg.Exception, IdStack, Tocsg.Trace;
{ TTgSvrScheduler }
destructor TTgSvrScheduler.Destroy;
begin
TerminateAllYarns;
inherited Destroy;
end;
procedure TTgSvrScheduler.StartYarn(AYarn: TIdYarn; ATask: TIdTask);
var
LThread: TIdThreadWithTask;
begin
LThread := TIdYarnOfThread(AYarn).Thread;
LThread.Task := ATask;
LThread.Start;
end;
function TTgSvrScheduler.NewThread: TIdThreadWithTask;
begin
Assert(FThreadClass<>nil);
if (FMaxThreads <> 0) and (not ActiveYarns.IsCountLessThan(FMaxThreads + 1)) then begin
raise EIdSchedulerMaxThreadsExceeded.Create(RSchedMaxThreadEx);
end;
Result := FThreadClass.Create(nil, IndyFormat('%s User', [Name])); {do not localize}
if ThreadPriority <> tpNormal then begin
IndySetThreadPriority(Result, ThreadPriority);
end;
end;
function TTgSvrScheduler.NewYarn(AThread: TIdThreadWithTask): TIdYarnOfThread;
begin
if not Assigned(AThread) then begin
raise EIdException.Create(RSThreadSchedulerThreadRequired);
end;
// Create Yarn
Result := TIdYarnOfThread.Create(Self, AThread);
end;
procedure TTgSvrScheduler.TerminateYarn(AYarn: TIdYarn);
var
LYarn: TIdYarnOfThread;
LThread: TIdThreadWithTask;
begin
Assert(AYarn<>nil);
LYarn := TIdYarnOfThread(AYarn);
LThread := LYarn.Thread;
if (LThread <> nil) and (not LThread.Suspended) then begin
// Is still running and will free itself
LThread.Stop;
// Dont free the yarn. The thread frees it (IdThread.pas)
end else
begin
// If suspended, was created but never started
// ie waiting on connection accept
// RLebeau: free the yarn here as well. This allows TIdSchedulerOfThreadPool
// to put the suspended thread, if present, back in the pool.
IdDisposeAndNil(LYarn);
end;
end;
function TTgSvrScheduler.AcquireYarn: TIdYarn;
begin
Result := NewYarn(NewThread);
ActiveYarns.Add(Result);
end;
type
TIdYarnOfThreadAccess = class(TIdYarnOfThread)
end;
procedure TTgSvrScheduler.ReleaseYarn(AYarn: TIdYarn);
//only gets called from YarnOf(Fiber/Thread).Destroy
var
LThread: TIdThreadWithTask;
begin
//take posession of the thread
LThread := TIdYarnOfThread(AYarn).Thread;
TIdYarnOfThreadAccess(AYarn).FThread := nil;
//Currently LThread can =nil. Is that a valid condition?
//Assert(LThread<>nil);
// inherited removes from ActiveYarns list
inherited ReleaseYarn(AYarn);
if LThread <> nil then begin
// need to destroy the thread
LThread.Yarn := nil; // Yarn is being destroyed, de-couple it from the thread
LThread.Terminate;
// RLebeau - ReleaseYarn() can be called in the context of
// the yarn's thread (when TIdThread.Cleanup() destroys the
// yarn between connnections), so have to check which context
// we're in here so as not to deadlock the thread!
if IsCurrentThread(LThread) then begin
LThread.FreeOnTerminate := True;
end else begin
{$IFDEF DEPRECATED_TThread_SuspendResume}
LThread.Suspended := False;
{$ELSE}
LThread.Resume;
{$ENDIF}
LThread.WaitFor;
LThread.Free;
end;
end;
end;
procedure TTgSvrScheduler.InitComponent;
begin
inherited InitComponent;
FThreadPriority := tpNormal;
FMaxThreads := 0;
FThreadClass := TIdThreadWithTask;
end;
procedure TTgSvrScheduler.TerminateAllYarns;
var
i: Integer;
LList: TIdYarnList;
Yarn: TIdYarn;
begin
Assert(FActiveYarns<>nil);
while True do begin
// Must unlock each time to allow yarns that are terminating to remove themselves from the list
LList := FActiveYarns.LockList;
try
if LList.Count = 0 then begin
Break;
end;
for i := LList.Count - 1 downto 0 do
begin
{$IFDEF HAS_GENERICS_TList}
Yarn := LList.Items[i];
{$ELSE}
Yarn := TIdYarn(LList.Items[i]);
{$ENDIF}
TerminateYarn(Yarn);
// 이거 추가를 위해서 이렇게 상속에 재정의까지 함... 21_0331 10:17:03 sunk
// 델파이 업데이트 또는 다른 환경에서 빌드 시 여기서 오류 날 수 있음...
Yarn.Free;
// LList.Delete(i);
end;
finally
FActiveYarns.UnlockList;
end;
IndySleep(500); // Wait a bit before looping to prevent thrashing
end;
end;
{ TCrmServerBase }
Constructor TCrmServerBase.Create(nActivePort: Integer = 0);
begin
Inherited Create(nil);
OnConnect := _EventConnect;
OnDisconnect := _EventDisconnect;
OnException := _EventException;
OnStatus := _EventStatus;
OnExecute := _EventExecute;
nPort_ := nActivePort;
Active := false;
end;
Destructor TCrmServerBase.Destroy;
begin
SetActive(false);
Inherited Destroy;
end;
procedure TCrmServerBase.SetActive(AValue: Boolean);
var
QdScheduler: TTgSvrScheduler;
begin
try
if AValue then
begin
DefaultPort := nPort_;
dtActive_ := now;
if not Assigned(Scheduler) then
begin
QdScheduler := TTgSvrScheduler.Create(Self);
SetScheduler(QdScheduler);
FImplicitScheduler := True;
QdScheduler.Name := Name + 'kkuScheduler'; {do not localize}
QdScheduler.Init;
end;
end else begin
dtActive_ := 0;
end;
Inherited;
except
on E: Exception do
ETgException.TraceException(Self, E, 'Fail .. SetActive()')
end;
end;
procedure TCrmServerBase.SetPort(nVal: Integer);
begin
if Active then
raise Exception.Create('Not available when the server is operational.');
nPort_ := nVal;
end;
procedure TCrmServerBase._Trace(const str: String);
begin
try
{$IFDEF TRACE_OBJ}
TTgTrace.T(Format('%s :: %s', [ClassName, str]));
{$ENDIF}
except
end;
end;
procedure TCrmServerBase._Trace(const sFormat: String; const Args: array of const);
begin
try
{$IFDEF TRACE_OBJ}
TTgTrace.T(Format('%s :: %s', [ClassName, sFormat]), Args);
{$ENDIF}
except
end;
end;
// 촙을 날려서 산놈, 죽어있는넘들을 건드린다.
// 이미 뻗은넘은 10053예외를 뱉으며 죽어갈꺼다.
// 살아있는놈은 무시하겠징
//procedure TCrmServerBase.TimerChop(Sender: TObject);
//var
// CtxList : TList;
// ctx: TIdContext;
// i: Integer;
//begin
// CtxList := Contexts.LockList;
// try
// for i := 0 to CtxList.Count - 1 do
// begin
// ctx := CtxList[i];
// if ctx.Connection.Connected then
// ctx.Connection.Socket.WriteLn('chop');
// end;
// finally
// Contexts.UnlockList;
// end;
//end;
//procedure TCrmServerBase.SendPacket(AContext: TIdContext; sSend: String);
//var
// Bytes: TIdBytes;
//begin
// try
// aSend.ReadySend;
// Bytes := aSend.GetBytes;
// AContext.Connection.Socket.Write(Bytes, Length(Bytes));
// except
// on E: Exception do
// ESunkException.TraceException(Self, Format('Fail .. SendPacket(), IP=%s', [AContext.Connection.Socket.Binding.PeerIP]), E);
// end;
//end;
//
//procedure TCrmServerBase.SendPacket(AContext: TIdContext; nCmd: Integer);
//var
// Send: TQdSendPacket;
//begin
// Safer(Send, TQdSendPacket.Create(nCmd));
// SendPacket(AContext, Send);
//end;
procedure TCrmServerBase._EventConnect(AContext: TIdContext);
begin
ASSERT(AContext.Data = nil);
AContext.Connection.Socket.ReadTimeout := 2000;
if Assigned(fnConnect_) then
fnConnect_(AContext);
end;
procedure TCrmServerBase._EventGetClientInfo(AContext: TIdContext);
begin
end;
procedure TCrmServerBase._EventDisconnect(AContext: TIdContext);
begin
if Assigned(fnDisconnect_) then
fnDisconnect_(AContext);
if (AContext.Connection.IOHandler <> nil) and AContext.Connection.IOHandler.Opened then
AContext.Connection.IOHandler.Close;
end;
procedure TCrmServerBase._EventExecute(AContext: TIdContext);
var
sRcv: AnsiString;
nRcvLen, nPos: Integer;
pBuf: TIdBytes;
begin
if Active then
begin
try
if not AContext.Connection.Socket.Connected then
exit;
nRcvLen := AContext.Connection.Socket.InputBuffer.Size;
if nRcvLen <= 0 then
begin
Sleep(500);
exit;
end;
AContext.Connection.Socket.ReadBytes(pBuf, nRcvLen, false);
if Length(pBuf) > 0 then
begin
sRcv := AnsiString(PAnsiChar(@pBuf[0]));
for nPos := 1 to Length(sRcv) do
if sRcv[nPos] = #10 then
break;
// nPos := Pos(#10, sRcv); // 음... 이게 안먹힌다..? 22_0420 09:58:10 kku
SetLength(sRcv, nPos - 1);
_ProcessRcv(AContext, sRcv);
// 데이터 전송 시 일회성 연결이기 때문에 바로 끊어준다.
// 계속 연결 유지하면 안됨 22_0420 10:17:39 kku
AContext.Connection.Disconnect;
end;
except
// 클라이언트에서 접속 종료중에도 먼가를 보내는데.. 그때마다 계속 예외생기네..
// 그냥 이렇게 먹어버리면 깔끔하게 해결되지만 뭔가 찝찝
on E: EIdSocketError do
begin
Case e.LastError of
10053, // 접속 비정상 종료
10054:
begin
ETgException.TraceException(Self, E, 'Fail .. SocketError');
if (AContext.Connection.IOHandler <> nil) and AContext.Connection.IOHandler.Opened then
AContext.Connection.IOHandler.Close;
// _EventDisconnect(AContext);
end;
End;
end;
on E: EIdNotConnected do
exit;
on E: Exception do
begin
if not (E is EIdConnClosedGracefully) then
ETgException.TraceException(Self, E, 'Fail .. _EventExecute()');
end;
end;
end;
end;
procedure TCrmServerBase._EventException(AContext: TIdContext; AException: Exception);
begin
_Trace('Exception - %s', [AException.Message]);
end;
procedure TCrmServerBase._EventStatus(ASender: TObject; const AStatus: TIdStatus;
const AStatusText: string);
begin
_Trace('Status - %s', [AStatusText]);
end;
{ TThdCrmVwClientBase }
Constructor TThdCrmVwClientBase.Create;
begin
Inherited Create;
sHost_ := '';
nPort_ := 0;
bConnected_ := false;
Client_ := TIdTCPClient.Create(nil);
Client_.ConnectTimeout := 3000;
Client_.ReadTimeout := 3000;
end;
Destructor TThdCrmVwClientBase.Destroy;
begin
FreeAndNil(Client_);
Inherited;
end;
function TThdCrmVwClientBase.Connect(const sHost: String; nPort: Integer): Boolean;
var
nOldCTO,
nOldRTO: Integer;
begin
Result := true;
if not GetConnected then
begin
with Client_ do
begin
Host := sHost;
Port := nPort;
nOldCTO := ConnectTimeout;
nOldRTO := ReadTimeout;
ConnectTimeout := 3000;
ReadTimeout := 3000;
try
try
Client_.Connect;
Socket.RecvBufferSize := 32 * 1024;
Socket.SendBufferSize := 32 * 1024;
sHost_ := sHost;
nPort_ := nPort;
Result := true;
SetConnected(true);
except
on E: EIdSocketError do
begin
if E.LastError <> 10061 then
ETgException.TraceException(Self, E, 'Fail .. Connect()');
Result := false;
SetConnected(false);
exit;
end;
on E: Exception do
begin
ETgException.TraceException(Self, E, 'Fail .. Connect()');
Result := false;
SetConnected(false);
exit;
end;
end;
finally
ReadTimeout := nOldRTO;
ConnectTimeout := nOldCTO;
end;
end;
end;
end;
function TThdCrmVwClientBase.GetConnected: Boolean;
begin
Lock;
try
Result := bConnected_;
finally
Unlock;
end;
end;
procedure TThdCrmVwClientBase.SetConnected(bVal: Boolean);
begin
Lock;
try
bConnected_ := bVal;
finally
Unlock;
end;
end;
function TThdCrmVwClientBase._ProcessRcv: Boolean;
begin
try
if not Client_.Socket.Readable(1) and
(Client_.Socket.InputBuffer.Size = 0) then exit;
Client_.Socket.ReadLn;
except
on E: Exception do
ETgException.TraceException(Self, E, 'Fail .. _ProcessRcv()');
end;
end;
//function TThdCrmVwClientBase._ProcessSend(Send: ISendPacket): Boolean;
//begin
//
//end;
procedure TThdCrmVwClientBase.Execute;
begin
while not Terminated and not GetWorkStop do
begin
try
if GetConnected then
begin
_ProcessRcv;
end else Sleep(1000);
except
on E: Exception do
ETgException.TraceException(Self, E, 'Fail .. Execute()');
end;
end;
end;
end.