{*******************************************************} { } { CrmSocketBase } { } { Copyright (C) 2022 kku } { } {*******************************************************} unit CrmSocketBase; interface uses Tocsg.Thread, System.Classes, System.SysUtils, IdScheduler, IdGlobal, IdThread, IdSchedulerOfThread, IdYarn, IdTask, IdTCPServer, IdCustomTCPServer, IdContext, IdComponent, IdExceptionCore, IdResourceStringsCore, IdException, IdTCPClient, Winapi.Windows, Winapi.Messages; type // TIdTCPServer.Active := false 시 TIdScheduler.TerminateAllYarns() 에서 행이 걸리는 문제가 있다.. // 인디 버그 인거 같은데 해결 방법이 없어서 이렇게 재구현 하는걸로 처리함... // TerminateAllYarns() 만 깔끔하게 TTgSvrScheduler = class(TIdScheduler) protected FMaxThreads: Integer; FThreadPriority: TIdThreadPriority; FThreadClass: TIdThreadWithTaskClass; procedure InitComponent; override; public destructor Destroy; override; function NewThread: TIdThreadWithTask; virtual; function NewYarn(AThread: TIdThreadWithTask = nil): TIdYarnOfThread; procedure StartYarn(AYarn: TIdYarn; ATask: TIdTask); override; procedure TerminateYarn(AYarn: TIdYarn); override; function AcquireYarn: TIdYarn; override; procedure ReleaseYarn(AYarn: TIdYarn); override; procedure TerminateAllYarns; override; property ThreadClass: TIdThreadWithTaskClass read FThreadClass write FThreadClass; published property MaxThreads: Integer read FMaxThreads write FMaxThreads; property ThreadPriority: TIdThreadPriority read FThreadPriority write FThreadPriority default tpNormal; end; TCrmServerBase = class(TIdTCPServer) private dtActive_: TDateTime; nPort_: Integer; protected fnConnect_, fnDisconnect_: TIdServerThreadEvent; procedure SetPort(nVal: Integer); procedure _Trace(const str: String); overload; procedure _Trace(const sFormat: String; const Args: array of const); overload; procedure _EventExecute(AContext: TIdContext); virtual; procedure _EventConnect(AContext: TIdContext); virtual; procedure _EventGetClientInfo(AContext: TIdContext); virtual; procedure _EventDisconnect(AContext: TIdContext); virtual; procedure _EventException(AContext: TIdContext; AException: Exception); procedure _EventStatus(ASender: TObject; const AStatus: TIdStatus; const AStatusText: string); procedure _ProcessRcv(AContext: TIdContext; sRcv: String); virtual; abstract; // procedure TimerChop(Sender: TObject); procedure SetActive(AValue: Boolean); override; public Constructor Create(nActivePort: Integer = 0); Destructor Destroy; override; // procedure SendPacket(AContext: TIdContext; sSend: String); overload; // procedure SendPacket(AContext: TIdContext; nCmd: Integer); overload; property Port: Integer read nPort_ write SetPort; published property OnClientConnect: TIdServerThreadEvent write fnConnect_; property OnClientDisconnect: TIdServerThreadEvent write fnDisconnect_; property ActiveDateTime: TDateTime read dtActive_; end; TThdCrmVwClientBase = class(TTgThread) protected sHost_: String; nPort_: Integer; Client_: TIdTCPClient; bConnected_: Boolean; function GetConnected: Boolean; procedure SetConnected(bVal: Boolean); function _ProcessRcv: Boolean; // function _ProcessSend(Send: ISendPacket): Boolean; virtual; procedure ProcessRcv(sRcv: String); virtual; abstract; procedure Execute; override; public Constructor Create; Destructor Destroy; override; function Connect(const sHost: String; nPort: Integer): Boolean; end; implementation uses Tocsg.Exception, IdStack, Tocsg.Trace; { TTgSvrScheduler } destructor TTgSvrScheduler.Destroy; begin TerminateAllYarns; inherited Destroy; end; procedure TTgSvrScheduler.StartYarn(AYarn: TIdYarn; ATask: TIdTask); var LThread: TIdThreadWithTask; begin LThread := TIdYarnOfThread(AYarn).Thread; LThread.Task := ATask; LThread.Start; end; function TTgSvrScheduler.NewThread: TIdThreadWithTask; begin Assert(FThreadClass<>nil); if (FMaxThreads <> 0) and (not ActiveYarns.IsCountLessThan(FMaxThreads + 1)) then begin raise EIdSchedulerMaxThreadsExceeded.Create(RSchedMaxThreadEx); end; Result := FThreadClass.Create(nil, IndyFormat('%s User', [Name])); {do not localize} if ThreadPriority <> tpNormal then begin IndySetThreadPriority(Result, ThreadPriority); end; end; function TTgSvrScheduler.NewYarn(AThread: TIdThreadWithTask): TIdYarnOfThread; begin if not Assigned(AThread) then begin raise EIdException.Create(RSThreadSchedulerThreadRequired); end; // Create Yarn Result := TIdYarnOfThread.Create(Self, AThread); end; procedure TTgSvrScheduler.TerminateYarn(AYarn: TIdYarn); var LYarn: TIdYarnOfThread; LThread: TIdThreadWithTask; begin Assert(AYarn<>nil); LYarn := TIdYarnOfThread(AYarn); LThread := LYarn.Thread; if (LThread <> nil) and (not LThread.Suspended) then begin // Is still running and will free itself LThread.Stop; // Dont free the yarn. The thread frees it (IdThread.pas) end else begin // If suspended, was created but never started // ie waiting on connection accept // RLebeau: free the yarn here as well. This allows TIdSchedulerOfThreadPool // to put the suspended thread, if present, back in the pool. IdDisposeAndNil(LYarn); end; end; function TTgSvrScheduler.AcquireYarn: TIdYarn; begin Result := NewYarn(NewThread); ActiveYarns.Add(Result); end; type TIdYarnOfThreadAccess = class(TIdYarnOfThread) end; procedure TTgSvrScheduler.ReleaseYarn(AYarn: TIdYarn); //only gets called from YarnOf(Fiber/Thread).Destroy var LThread: TIdThreadWithTask; begin //take posession of the thread LThread := TIdYarnOfThread(AYarn).Thread; TIdYarnOfThreadAccess(AYarn).FThread := nil; //Currently LThread can =nil. Is that a valid condition? //Assert(LThread<>nil); // inherited removes from ActiveYarns list inherited ReleaseYarn(AYarn); if LThread <> nil then begin // need to destroy the thread LThread.Yarn := nil; // Yarn is being destroyed, de-couple it from the thread LThread.Terminate; // RLebeau - ReleaseYarn() can be called in the context of // the yarn's thread (when TIdThread.Cleanup() destroys the // yarn between connnections), so have to check which context // we're in here so as not to deadlock the thread! if IsCurrentThread(LThread) then begin LThread.FreeOnTerminate := True; end else begin {$IFDEF DEPRECATED_TThread_SuspendResume} LThread.Suspended := False; {$ELSE} LThread.Resume; {$ENDIF} LThread.WaitFor; LThread.Free; end; end; end; procedure TTgSvrScheduler.InitComponent; begin inherited InitComponent; FThreadPriority := tpNormal; FMaxThreads := 0; FThreadClass := TIdThreadWithTask; end; procedure TTgSvrScheduler.TerminateAllYarns; var i: Integer; LList: TIdYarnList; Yarn: TIdYarn; begin Assert(FActiveYarns<>nil); while True do begin // Must unlock each time to allow yarns that are terminating to remove themselves from the list LList := FActiveYarns.LockList; try if LList.Count = 0 then begin Break; end; for i := LList.Count - 1 downto 0 do begin {$IFDEF HAS_GENERICS_TList} Yarn := LList.Items[i]; {$ELSE} Yarn := TIdYarn(LList.Items[i]); {$ENDIF} TerminateYarn(Yarn); // 이거 추가를 위해서 이렇게 상속에 재정의까지 함... 21_0331 10:17:03 sunk // 델파이 업데이트 또는 다른 환경에서 빌드 시 여기서 오류 날 수 있음... Yarn.Free; // LList.Delete(i); end; finally FActiveYarns.UnlockList; end; IndySleep(500); // Wait a bit before looping to prevent thrashing end; end; { TCrmServerBase } Constructor TCrmServerBase.Create(nActivePort: Integer = 0); begin Inherited Create(nil); OnConnect := _EventConnect; OnDisconnect := _EventDisconnect; OnException := _EventException; OnStatus := _EventStatus; OnExecute := _EventExecute; nPort_ := nActivePort; Active := false; end; Destructor TCrmServerBase.Destroy; begin SetActive(false); Inherited Destroy; end; procedure TCrmServerBase.SetActive(AValue: Boolean); var QdScheduler: TTgSvrScheduler; begin try if AValue then begin DefaultPort := nPort_; dtActive_ := now; if not Assigned(Scheduler) then begin QdScheduler := TTgSvrScheduler.Create(Self); SetScheduler(QdScheduler); FImplicitScheduler := True; QdScheduler.Name := Name + 'kkuScheduler'; {do not localize} QdScheduler.Init; end; end else begin dtActive_ := 0; end; Inherited; except on E: Exception do ETgException.TraceException(Self, E, 'Fail .. SetActive()') end; end; procedure TCrmServerBase.SetPort(nVal: Integer); begin if Active then raise Exception.Create('Not available when the server is operational.'); nPort_ := nVal; end; procedure TCrmServerBase._Trace(const str: String); begin try {$IFDEF TRACE_OBJ} TTgTrace.T(Format('%s :: %s', [ClassName, str])); {$ENDIF} except end; end; procedure TCrmServerBase._Trace(const sFormat: String; const Args: array of const); begin try {$IFDEF TRACE_OBJ} TTgTrace.T(Format('%s :: %s', [ClassName, sFormat]), Args); {$ENDIF} except end; end; // 촙을 날려서 산놈, 죽어있는넘들을 건드린다. // 이미 뻗은넘은 10053예외를 뱉으며 죽어갈꺼다. // 살아있는놈은 무시하겠징 //procedure TCrmServerBase.TimerChop(Sender: TObject); //var // CtxList : TList; // ctx: TIdContext; // i: Integer; //begin // CtxList := Contexts.LockList; // try // for i := 0 to CtxList.Count - 1 do // begin // ctx := CtxList[i]; // if ctx.Connection.Connected then // ctx.Connection.Socket.WriteLn('chop'); // end; // finally // Contexts.UnlockList; // end; //end; //procedure TCrmServerBase.SendPacket(AContext: TIdContext; sSend: String); //var // Bytes: TIdBytes; //begin // try // aSend.ReadySend; // Bytes := aSend.GetBytes; // AContext.Connection.Socket.Write(Bytes, Length(Bytes)); // except // on E: Exception do // ESunkException.TraceException(Self, Format('Fail .. SendPacket(), IP=%s', [AContext.Connection.Socket.Binding.PeerIP]), E); // end; //end; // //procedure TCrmServerBase.SendPacket(AContext: TIdContext; nCmd: Integer); //var // Send: TQdSendPacket; //begin // Safer(Send, TQdSendPacket.Create(nCmd)); // SendPacket(AContext, Send); //end; procedure TCrmServerBase._EventConnect(AContext: TIdContext); begin ASSERT(AContext.Data = nil); AContext.Connection.Socket.ReadTimeout := 2000; if Assigned(fnConnect_) then fnConnect_(AContext); end; procedure TCrmServerBase._EventGetClientInfo(AContext: TIdContext); begin end; procedure TCrmServerBase._EventDisconnect(AContext: TIdContext); begin if Assigned(fnDisconnect_) then fnDisconnect_(AContext); if (AContext.Connection.IOHandler <> nil) and AContext.Connection.IOHandler.Opened then AContext.Connection.IOHandler.Close; end; procedure TCrmServerBase._EventExecute(AContext: TIdContext); var sRcv: AnsiString; nRcvLen, nPos: Integer; pBuf: TIdBytes; begin if Active then begin try if not AContext.Connection.Socket.Connected then exit; nRcvLen := AContext.Connection.Socket.InputBuffer.Size; if nRcvLen <= 0 then begin Sleep(500); exit; end; AContext.Connection.Socket.ReadBytes(pBuf, nRcvLen, false); if Length(pBuf) > 0 then begin sRcv := AnsiString(PAnsiChar(@pBuf[0])); for nPos := 1 to Length(sRcv) do if sRcv[nPos] = #10 then break; // nPos := Pos(#10, sRcv); // 음... 이게 안먹힌다..? 22_0420 09:58:10 kku SetLength(sRcv, nPos - 1); _ProcessRcv(AContext, sRcv); // 데이터 전송 시 일회성 연결이기 때문에 바로 끊어준다. // 계속 연결 유지하면 안됨 22_0420 10:17:39 kku AContext.Connection.Disconnect; end; except // 클라이언트에서 접속 종료중에도 먼가를 보내는데.. 그때마다 계속 예외생기네.. // 그냥 이렇게 먹어버리면 깔끔하게 해결되지만 뭔가 찝찝 on E: EIdSocketError do begin Case e.LastError of 10053, // 접속 비정상 종료 10054: begin ETgException.TraceException(Self, E, 'Fail .. SocketError'); if (AContext.Connection.IOHandler <> nil) and AContext.Connection.IOHandler.Opened then AContext.Connection.IOHandler.Close; // _EventDisconnect(AContext); end; End; end; on E: EIdNotConnected do exit; on E: Exception do begin if not (E is EIdConnClosedGracefully) then ETgException.TraceException(Self, E, 'Fail .. _EventExecute()'); end; end; end; end; procedure TCrmServerBase._EventException(AContext: TIdContext; AException: Exception); begin _Trace('Exception - %s', [AException.Message]); end; procedure TCrmServerBase._EventStatus(ASender: TObject; const AStatus: TIdStatus; const AStatusText: string); begin _Trace('Status - %s', [AStatusText]); end; { TThdCrmVwClientBase } Constructor TThdCrmVwClientBase.Create; begin Inherited Create; sHost_ := ''; nPort_ := 0; bConnected_ := false; Client_ := TIdTCPClient.Create(nil); Client_.ConnectTimeout := 3000; Client_.ReadTimeout := 3000; end; Destructor TThdCrmVwClientBase.Destroy; begin FreeAndNil(Client_); Inherited; end; function TThdCrmVwClientBase.Connect(const sHost: String; nPort: Integer): Boolean; var nOldCTO, nOldRTO: Integer; begin Result := true; if not GetConnected then begin with Client_ do begin Host := sHost; Port := nPort; nOldCTO := ConnectTimeout; nOldRTO := ReadTimeout; ConnectTimeout := 3000; ReadTimeout := 3000; try try Client_.Connect; Socket.RecvBufferSize := 32 * 1024; Socket.SendBufferSize := 32 * 1024; sHost_ := sHost; nPort_ := nPort; Result := true; SetConnected(true); except on E: EIdSocketError do begin if E.LastError <> 10061 then ETgException.TraceException(Self, E, 'Fail .. Connect()'); Result := false; SetConnected(false); exit; end; on E: Exception do begin ETgException.TraceException(Self, E, 'Fail .. Connect()'); Result := false; SetConnected(false); exit; end; end; finally ReadTimeout := nOldRTO; ConnectTimeout := nOldCTO; end; end; end; end; function TThdCrmVwClientBase.GetConnected: Boolean; begin Lock; try Result := bConnected_; finally Unlock; end; end; procedure TThdCrmVwClientBase.SetConnected(bVal: Boolean); begin Lock; try bConnected_ := bVal; finally Unlock; end; end; function TThdCrmVwClientBase._ProcessRcv: Boolean; begin try if not Client_.Socket.Readable(1) and (Client_.Socket.InputBuffer.Size = 0) then exit; Client_.Socket.ReadLn; except on E: Exception do ETgException.TraceException(Self, E, 'Fail .. _ProcessRcv()'); end; end; //function TThdCrmVwClientBase._ProcessSend(Send: ISendPacket): Boolean; //begin // //end; procedure TThdCrmVwClientBase.Execute; begin while not Terminated and not GetWorkStop do begin try if GetConnected then begin _ProcessRcv; end else Sleep(1000); except on E: Exception do ETgException.TraceException(Self, E, 'Fail .. Execute()'); end; end; end; end.