619 lines
16 KiB
Plaintext
619 lines
16 KiB
Plaintext
{*******************************************************}
|
|
{ }
|
|
{ CrmSocketBase }
|
|
{ }
|
|
{ Copyright (C) 2022 kku }
|
|
{ }
|
|
{*******************************************************}
|
|
|
|
unit CrmSocketBase;
|
|
|
|
interface
|
|
|
|
uses
|
|
Tocsg.Thread, System.Classes, System.SysUtils, IdScheduler, IdGlobal,
|
|
IdThread, IdSchedulerOfThread, IdYarn, IdTask, IdTCPServer,
|
|
IdCustomTCPServer, IdContext, IdComponent, IdExceptionCore,
|
|
IdResourceStringsCore, IdException, IdTCPClient, Winapi.Windows,
|
|
Winapi.Messages;
|
|
|
|
type
|
|
// TIdTCPServer.Active := false 시 TIdScheduler.TerminateAllYarns() 에서 행이 걸리는 문제가 있다..
|
|
// 인디 버그 인거 같은데 해결 방법이 없어서 이렇게 재구현 하는걸로 처리함...
|
|
// TerminateAllYarns() 만 깔끔하게
|
|
TTgSvrScheduler = class(TIdScheduler)
|
|
protected
|
|
FMaxThreads: Integer;
|
|
FThreadPriority: TIdThreadPriority;
|
|
FThreadClass: TIdThreadWithTaskClass;
|
|
procedure InitComponent; override;
|
|
public
|
|
destructor Destroy; override;
|
|
function NewThread: TIdThreadWithTask; virtual;
|
|
function NewYarn(AThread: TIdThreadWithTask = nil): TIdYarnOfThread;
|
|
procedure StartYarn(AYarn: TIdYarn; ATask: TIdTask); override;
|
|
procedure TerminateYarn(AYarn: TIdYarn); override;
|
|
|
|
function AcquireYarn: TIdYarn; override;
|
|
procedure ReleaseYarn(AYarn: TIdYarn); override;
|
|
|
|
procedure TerminateAllYarns; override;
|
|
|
|
property ThreadClass: TIdThreadWithTaskClass read FThreadClass write FThreadClass;
|
|
published
|
|
property MaxThreads: Integer read FMaxThreads write FMaxThreads;
|
|
property ThreadPriority: TIdThreadPriority read FThreadPriority write FThreadPriority default tpNormal;
|
|
end;
|
|
|
|
TCrmServerBase = class(TIdTCPServer)
|
|
private
|
|
dtActive_: TDateTime;
|
|
nPort_: Integer;
|
|
protected
|
|
fnConnect_,
|
|
fnDisconnect_: TIdServerThreadEvent;
|
|
|
|
procedure SetPort(nVal: Integer);
|
|
|
|
procedure _Trace(const str: String); overload;
|
|
procedure _Trace(const sFormat: String; const Args: array of const); overload;
|
|
|
|
procedure _EventExecute(AContext: TIdContext); virtual;
|
|
procedure _EventConnect(AContext: TIdContext); virtual;
|
|
procedure _EventGetClientInfo(AContext: TIdContext); virtual;
|
|
procedure _EventDisconnect(AContext: TIdContext); virtual;
|
|
procedure _EventException(AContext: TIdContext; AException: Exception);
|
|
procedure _EventStatus(ASender: TObject; const AStatus: TIdStatus;
|
|
const AStatusText: string);
|
|
|
|
procedure _ProcessRcv(AContext: TIdContext; sRcv: String); virtual; abstract;
|
|
|
|
// procedure TimerChop(Sender: TObject);
|
|
procedure SetActive(AValue: Boolean); override;
|
|
public
|
|
Constructor Create(nActivePort: Integer = 0);
|
|
Destructor Destroy; override;
|
|
|
|
// procedure SendPacket(AContext: TIdContext; sSend: String); overload;
|
|
// procedure SendPacket(AContext: TIdContext; nCmd: Integer); overload;
|
|
|
|
property Port: Integer read nPort_ write SetPort;
|
|
published
|
|
property OnClientConnect: TIdServerThreadEvent write fnConnect_;
|
|
property OnClientDisconnect: TIdServerThreadEvent write fnDisconnect_;
|
|
|
|
property ActiveDateTime: TDateTime read dtActive_;
|
|
end;
|
|
|
|
TThdCrmVwClientBase = class(TTgThread)
|
|
protected
|
|
sHost_: String;
|
|
nPort_: Integer;
|
|
Client_: TIdTCPClient;
|
|
bConnected_: Boolean;
|
|
function GetConnected: Boolean;
|
|
procedure SetConnected(bVal: Boolean);
|
|
function _ProcessRcv: Boolean;
|
|
// function _ProcessSend(Send: ISendPacket): Boolean; virtual;
|
|
|
|
procedure ProcessRcv(sRcv: String); virtual; abstract;
|
|
|
|
procedure Execute; override;
|
|
public
|
|
Constructor Create;
|
|
Destructor Destroy; override;
|
|
|
|
function Connect(const sHost: String; nPort: Integer): Boolean;
|
|
end;
|
|
|
|
implementation
|
|
|
|
uses
|
|
Tocsg.Exception, IdStack, Tocsg.Trace;
|
|
|
|
{ TTgSvrScheduler }
|
|
|
|
destructor TTgSvrScheduler.Destroy;
|
|
begin
|
|
TerminateAllYarns;
|
|
inherited Destroy;
|
|
end;
|
|
|
|
procedure TTgSvrScheduler.StartYarn(AYarn: TIdYarn; ATask: TIdTask);
|
|
var
|
|
LThread: TIdThreadWithTask;
|
|
begin
|
|
LThread := TIdYarnOfThread(AYarn).Thread;
|
|
LThread.Task := ATask;
|
|
LThread.Start;
|
|
end;
|
|
|
|
function TTgSvrScheduler.NewThread: TIdThreadWithTask;
|
|
begin
|
|
Assert(FThreadClass<>nil);
|
|
if (FMaxThreads <> 0) and (not ActiveYarns.IsCountLessThan(FMaxThreads + 1)) then begin
|
|
raise EIdSchedulerMaxThreadsExceeded.Create(RSchedMaxThreadEx);
|
|
end;
|
|
Result := FThreadClass.Create(nil, IndyFormat('%s User', [Name])); {do not localize}
|
|
if ThreadPriority <> tpNormal then begin
|
|
IndySetThreadPriority(Result, ThreadPriority);
|
|
end;
|
|
end;
|
|
|
|
function TTgSvrScheduler.NewYarn(AThread: TIdThreadWithTask): TIdYarnOfThread;
|
|
begin
|
|
if not Assigned(AThread) then begin
|
|
raise EIdException.Create(RSThreadSchedulerThreadRequired);
|
|
end;
|
|
// Create Yarn
|
|
Result := TIdYarnOfThread.Create(Self, AThread);
|
|
end;
|
|
|
|
procedure TTgSvrScheduler.TerminateYarn(AYarn: TIdYarn);
|
|
var
|
|
LYarn: TIdYarnOfThread;
|
|
LThread: TIdThreadWithTask;
|
|
begin
|
|
Assert(AYarn<>nil);
|
|
LYarn := TIdYarnOfThread(AYarn);
|
|
LThread := LYarn.Thread;
|
|
if (LThread <> nil) and (not LThread.Suspended) then begin
|
|
// Is still running and will free itself
|
|
LThread.Stop;
|
|
// Dont free the yarn. The thread frees it (IdThread.pas)
|
|
end else
|
|
begin
|
|
// If suspended, was created but never started
|
|
// ie waiting on connection accept
|
|
|
|
// RLebeau: free the yarn here as well. This allows TIdSchedulerOfThreadPool
|
|
// to put the suspended thread, if present, back in the pool.
|
|
|
|
IdDisposeAndNil(LYarn);
|
|
end;
|
|
end;
|
|
|
|
function TTgSvrScheduler.AcquireYarn: TIdYarn;
|
|
begin
|
|
Result := NewYarn(NewThread);
|
|
ActiveYarns.Add(Result);
|
|
end;
|
|
|
|
type
|
|
TIdYarnOfThreadAccess = class(TIdYarnOfThread)
|
|
end;
|
|
|
|
procedure TTgSvrScheduler.ReleaseYarn(AYarn: TIdYarn);
|
|
//only gets called from YarnOf(Fiber/Thread).Destroy
|
|
var
|
|
LThread: TIdThreadWithTask;
|
|
begin
|
|
//take posession of the thread
|
|
LThread := TIdYarnOfThread(AYarn).Thread;
|
|
TIdYarnOfThreadAccess(AYarn).FThread := nil;
|
|
//Currently LThread can =nil. Is that a valid condition?
|
|
//Assert(LThread<>nil);
|
|
|
|
// inherited removes from ActiveYarns list
|
|
inherited ReleaseYarn(AYarn);
|
|
|
|
if LThread <> nil then begin
|
|
// need to destroy the thread
|
|
LThread.Yarn := nil; // Yarn is being destroyed, de-couple it from the thread
|
|
LThread.Terminate;
|
|
// RLebeau - ReleaseYarn() can be called in the context of
|
|
// the yarn's thread (when TIdThread.Cleanup() destroys the
|
|
// yarn between connnections), so have to check which context
|
|
// we're in here so as not to deadlock the thread!
|
|
if IsCurrentThread(LThread) then begin
|
|
LThread.FreeOnTerminate := True;
|
|
end else begin
|
|
{$IFDEF DEPRECATED_TThread_SuspendResume}
|
|
LThread.Suspended := False;
|
|
{$ELSE}
|
|
LThread.Resume;
|
|
{$ENDIF}
|
|
LThread.WaitFor;
|
|
LThread.Free;
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
procedure TTgSvrScheduler.InitComponent;
|
|
begin
|
|
inherited InitComponent;
|
|
FThreadPriority := tpNormal;
|
|
FMaxThreads := 0;
|
|
FThreadClass := TIdThreadWithTask;
|
|
end;
|
|
|
|
procedure TTgSvrScheduler.TerminateAllYarns;
|
|
var
|
|
i: Integer;
|
|
LList: TIdYarnList;
|
|
Yarn: TIdYarn;
|
|
begin
|
|
Assert(FActiveYarns<>nil);
|
|
|
|
while True do begin
|
|
// Must unlock each time to allow yarns that are terminating to remove themselves from the list
|
|
LList := FActiveYarns.LockList;
|
|
try
|
|
if LList.Count = 0 then begin
|
|
Break;
|
|
end;
|
|
for i := LList.Count - 1 downto 0 do
|
|
begin
|
|
{$IFDEF HAS_GENERICS_TList}
|
|
Yarn := LList.Items[i];
|
|
{$ELSE}
|
|
Yarn := TIdYarn(LList.Items[i]);
|
|
{$ENDIF}
|
|
TerminateYarn(Yarn);
|
|
|
|
// 이거 추가를 위해서 이렇게 상속에 재정의까지 함... 21_0331 10:17:03 sunk
|
|
// 델파이 업데이트 또는 다른 환경에서 빌드 시 여기서 오류 날 수 있음...
|
|
Yarn.Free;
|
|
// LList.Delete(i);
|
|
end;
|
|
finally
|
|
FActiveYarns.UnlockList;
|
|
end;
|
|
|
|
IndySleep(500); // Wait a bit before looping to prevent thrashing
|
|
end;
|
|
end;
|
|
|
|
{ TCrmServerBase }
|
|
|
|
Constructor TCrmServerBase.Create(nActivePort: Integer = 0);
|
|
begin
|
|
Inherited Create(nil);
|
|
|
|
OnConnect := _EventConnect;
|
|
OnDisconnect := _EventDisconnect;
|
|
OnException := _EventException;
|
|
OnStatus := _EventStatus;
|
|
OnExecute := _EventExecute;
|
|
|
|
nPort_ := nActivePort;
|
|
|
|
Active := false;
|
|
end;
|
|
|
|
Destructor TCrmServerBase.Destroy;
|
|
begin
|
|
SetActive(false);
|
|
Inherited Destroy;
|
|
end;
|
|
|
|
procedure TCrmServerBase.SetActive(AValue: Boolean);
|
|
var
|
|
QdScheduler: TTgSvrScheduler;
|
|
begin
|
|
try
|
|
if AValue then
|
|
begin
|
|
DefaultPort := nPort_;
|
|
dtActive_ := now;
|
|
|
|
if not Assigned(Scheduler) then
|
|
begin
|
|
QdScheduler := TTgSvrScheduler.Create(Self);
|
|
SetScheduler(QdScheduler);
|
|
FImplicitScheduler := True;
|
|
QdScheduler.Name := Name + 'kkuScheduler'; {do not localize}
|
|
QdScheduler.Init;
|
|
end;
|
|
end else begin
|
|
dtActive_ := 0;
|
|
end;
|
|
|
|
Inherited;
|
|
except
|
|
on E: Exception do
|
|
ETgException.TraceException(Self, E, 'Fail .. SetActive()')
|
|
end;
|
|
end;
|
|
|
|
procedure TCrmServerBase.SetPort(nVal: Integer);
|
|
begin
|
|
if Active then
|
|
raise Exception.Create('Not available when the server is operational.');
|
|
|
|
nPort_ := nVal;
|
|
end;
|
|
|
|
procedure TCrmServerBase._Trace(const str: String);
|
|
begin
|
|
try
|
|
{$IFDEF TRACE_OBJ}
|
|
TTgTrace.T(Format('%s :: %s', [ClassName, str]));
|
|
{$ENDIF}
|
|
except
|
|
|
|
end;
|
|
end;
|
|
|
|
procedure TCrmServerBase._Trace(const sFormat: String; const Args: array of const);
|
|
begin
|
|
try
|
|
{$IFDEF TRACE_OBJ}
|
|
TTgTrace.T(Format('%s :: %s', [ClassName, sFormat]), Args);
|
|
{$ENDIF}
|
|
except
|
|
|
|
end;
|
|
end;
|
|
|
|
// 촙을 날려서 산놈, 죽어있는넘들을 건드린다.
|
|
// 이미 뻗은넘은 10053예외를 뱉으며 죽어갈꺼다.
|
|
// 살아있는놈은 무시하겠징
|
|
//procedure TCrmServerBase.TimerChop(Sender: TObject);
|
|
//var
|
|
// CtxList : TList;
|
|
// ctx: TIdContext;
|
|
// i: Integer;
|
|
//begin
|
|
// CtxList := Contexts.LockList;
|
|
// try
|
|
// for i := 0 to CtxList.Count - 1 do
|
|
// begin
|
|
// ctx := CtxList[i];
|
|
// if ctx.Connection.Connected then
|
|
// ctx.Connection.Socket.WriteLn('chop');
|
|
// end;
|
|
// finally
|
|
// Contexts.UnlockList;
|
|
// end;
|
|
//end;
|
|
|
|
//procedure TCrmServerBase.SendPacket(AContext: TIdContext; sSend: String);
|
|
//var
|
|
// Bytes: TIdBytes;
|
|
//begin
|
|
// try
|
|
// aSend.ReadySend;
|
|
// Bytes := aSend.GetBytes;
|
|
// AContext.Connection.Socket.Write(Bytes, Length(Bytes));
|
|
// except
|
|
// on E: Exception do
|
|
// ESunkException.TraceException(Self, Format('Fail .. SendPacket(), IP=%s', [AContext.Connection.Socket.Binding.PeerIP]), E);
|
|
// end;
|
|
//end;
|
|
//
|
|
//procedure TCrmServerBase.SendPacket(AContext: TIdContext; nCmd: Integer);
|
|
//var
|
|
// Send: TQdSendPacket;
|
|
//begin
|
|
// Safer(Send, TQdSendPacket.Create(nCmd));
|
|
// SendPacket(AContext, Send);
|
|
//end;
|
|
|
|
procedure TCrmServerBase._EventConnect(AContext: TIdContext);
|
|
begin
|
|
ASSERT(AContext.Data = nil);
|
|
|
|
AContext.Connection.Socket.ReadTimeout := 2000;
|
|
if Assigned(fnConnect_) then
|
|
fnConnect_(AContext);
|
|
end;
|
|
|
|
procedure TCrmServerBase._EventGetClientInfo(AContext: TIdContext);
|
|
begin
|
|
|
|
end;
|
|
|
|
procedure TCrmServerBase._EventDisconnect(AContext: TIdContext);
|
|
begin
|
|
if Assigned(fnDisconnect_) then
|
|
fnDisconnect_(AContext);
|
|
|
|
if (AContext.Connection.IOHandler <> nil) and AContext.Connection.IOHandler.Opened then
|
|
AContext.Connection.IOHandler.Close;
|
|
end;
|
|
|
|
procedure TCrmServerBase._EventExecute(AContext: TIdContext);
|
|
var
|
|
sRcv: AnsiString;
|
|
nRcvLen, nPos: Integer;
|
|
pBuf: TIdBytes;
|
|
begin
|
|
if Active then
|
|
begin
|
|
try
|
|
if not AContext.Connection.Socket.Connected then
|
|
exit;
|
|
|
|
nRcvLen := AContext.Connection.Socket.InputBuffer.Size;
|
|
if nRcvLen <= 0 then
|
|
begin
|
|
Sleep(500);
|
|
exit;
|
|
end;
|
|
|
|
AContext.Connection.Socket.ReadBytes(pBuf, nRcvLen, false);
|
|
if Length(pBuf) > 0 then
|
|
begin
|
|
sRcv := AnsiString(PAnsiChar(@pBuf[0]));
|
|
for nPos := 1 to Length(sRcv) do
|
|
if sRcv[nPos] = #10 then
|
|
break;
|
|
// nPos := Pos(#10, sRcv); // 음... 이게 안먹힌다..? 22_0420 09:58:10 kku
|
|
SetLength(sRcv, nPos - 1);
|
|
_ProcessRcv(AContext, sRcv);
|
|
|
|
// 데이터 전송 시 일회성 연결이기 때문에 바로 끊어준다.
|
|
// 계속 연결 유지하면 안됨 22_0420 10:17:39 kku
|
|
AContext.Connection.Disconnect;
|
|
end;
|
|
except
|
|
// 클라이언트에서 접속 종료중에도 먼가를 보내는데.. 그때마다 계속 예외생기네..
|
|
// 그냥 이렇게 먹어버리면 깔끔하게 해결되지만 뭔가 찝찝
|
|
on E: EIdSocketError do
|
|
begin
|
|
Case e.LastError of
|
|
10053, // 접속 비정상 종료
|
|
10054:
|
|
begin
|
|
ETgException.TraceException(Self, E, 'Fail .. SocketError');
|
|
if (AContext.Connection.IOHandler <> nil) and AContext.Connection.IOHandler.Opened then
|
|
AContext.Connection.IOHandler.Close;
|
|
// _EventDisconnect(AContext);
|
|
end;
|
|
End;
|
|
end;
|
|
|
|
on E: EIdNotConnected do
|
|
exit;
|
|
|
|
on E: Exception do
|
|
begin
|
|
if not (E is EIdConnClosedGracefully) then
|
|
ETgException.TraceException(Self, E, 'Fail .. _EventExecute()');
|
|
end;
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
procedure TCrmServerBase._EventException(AContext: TIdContext; AException: Exception);
|
|
begin
|
|
_Trace('Exception - %s', [AException.Message]);
|
|
end;
|
|
|
|
procedure TCrmServerBase._EventStatus(ASender: TObject; const AStatus: TIdStatus;
|
|
const AStatusText: string);
|
|
begin
|
|
_Trace('Status - %s', [AStatusText]);
|
|
end;
|
|
|
|
{ TThdCrmVwClientBase }
|
|
|
|
Constructor TThdCrmVwClientBase.Create;
|
|
begin
|
|
Inherited Create;
|
|
sHost_ := '';
|
|
nPort_ := 0;
|
|
bConnected_ := false;
|
|
Client_ := TIdTCPClient.Create(nil);
|
|
Client_.ConnectTimeout := 3000;
|
|
Client_.ReadTimeout := 3000;
|
|
end;
|
|
|
|
Destructor TThdCrmVwClientBase.Destroy;
|
|
begin
|
|
FreeAndNil(Client_);
|
|
Inherited;
|
|
end;
|
|
|
|
function TThdCrmVwClientBase.Connect(const sHost: String; nPort: Integer): Boolean;
|
|
var
|
|
nOldCTO,
|
|
nOldRTO: Integer;
|
|
begin
|
|
Result := true;
|
|
|
|
if not GetConnected then
|
|
begin
|
|
with Client_ do
|
|
begin
|
|
Host := sHost;
|
|
Port := nPort;
|
|
|
|
nOldCTO := ConnectTimeout;
|
|
nOldRTO := ReadTimeout;
|
|
ConnectTimeout := 3000;
|
|
ReadTimeout := 3000;
|
|
try
|
|
try
|
|
Client_.Connect;
|
|
|
|
Socket.RecvBufferSize := 32 * 1024;
|
|
Socket.SendBufferSize := 32 * 1024;
|
|
|
|
sHost_ := sHost;
|
|
nPort_ := nPort;
|
|
Result := true;
|
|
SetConnected(true);
|
|
except
|
|
on E: EIdSocketError do
|
|
begin
|
|
if E.LastError <> 10061 then
|
|
ETgException.TraceException(Self, E, 'Fail .. Connect()');
|
|
Result := false;
|
|
SetConnected(false);
|
|
exit;
|
|
end;
|
|
|
|
on E: Exception do
|
|
begin
|
|
ETgException.TraceException(Self, E, 'Fail .. Connect()');
|
|
Result := false;
|
|
SetConnected(false);
|
|
exit;
|
|
end;
|
|
end;
|
|
finally
|
|
ReadTimeout := nOldRTO;
|
|
ConnectTimeout := nOldCTO;
|
|
end;
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
function TThdCrmVwClientBase.GetConnected: Boolean;
|
|
begin
|
|
Lock;
|
|
try
|
|
Result := bConnected_;
|
|
finally
|
|
Unlock;
|
|
end;
|
|
end;
|
|
|
|
procedure TThdCrmVwClientBase.SetConnected(bVal: Boolean);
|
|
begin
|
|
Lock;
|
|
try
|
|
bConnected_ := bVal;
|
|
finally
|
|
Unlock;
|
|
end;
|
|
end;
|
|
|
|
function TThdCrmVwClientBase._ProcessRcv: Boolean;
|
|
begin
|
|
try
|
|
if not Client_.Socket.Readable(1) and
|
|
(Client_.Socket.InputBuffer.Size = 0) then exit;
|
|
|
|
Client_.Socket.ReadLn;
|
|
except
|
|
on E: Exception do
|
|
ETgException.TraceException(Self, E, 'Fail .. _ProcessRcv()');
|
|
end;
|
|
end;
|
|
|
|
//function TThdCrmVwClientBase._ProcessSend(Send: ISendPacket): Boolean;
|
|
//begin
|
|
//
|
|
//end;
|
|
|
|
procedure TThdCrmVwClientBase.Execute;
|
|
begin
|
|
while not Terminated and not GetWorkStop do
|
|
begin
|
|
try
|
|
if GetConnected then
|
|
begin
|
|
_ProcessRcv;
|
|
end else Sleep(1000);
|
|
except
|
|
on E: Exception do
|
|
ETgException.TraceException(Self, E, 'Fail .. Execute()');
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
end.
|