View Issue Details

IDProjectCategoryView StatusLast Update
0031805FPCFCLpublic2018-02-10 01:08
ReporterLars(L505)Assigned ToMichael Van Canneyt 
PrioritynormalSeverityminorReproducibilityalways
Status resolvedResolutionfixed 
Platform32 bitOSWindowsOS Version7
Product Version3.0.2Product Build 
Target Version3.2.0Fixed in Version3.1.1 
Summary0031805: Simple IPC stopserver hangs app when threaded
DescriptionAm I just dense today or is this a bug? I can't figure out why the application hangs when I stop the simpleipc server.

A simple test case is below with two lazarus buttons on it. Click the first button, everything is okay. Click the second button to stop the server and it hangs. Only if threaded is set to true. A command line program could be built to do similar but would require reading the keyboard events.
Steps To Reproduceuses simpleipc;

//...

var srv: TSimpleIpcServer;

procedure TForm1.Button1Click(Sender: TObject);
begin
  srv := TSimpleIpcServer.Create(nil);
  srv.Global := true;
  srv.ServerID := '123abc';
  srv.StartServer(true);
end;

// when calling this, it causes the app to hang
procedure TForm1.Button2Click(Sender: TObject);
begin
  srv.StopServer;
  // srv.free; // or this
end;
Additional InformationTried it in Lazarus 1.6.4
But if this is is a bug it is an fpc bug not lazarus.
But hopefully I'm missing something obvious?
If it is something obvious, should StopServer() check for issues before attempting to stop? It appears to go into a deadlock
Tagspatch
Fixed in Revision36915
FPCOldBugId
FPCTarget
Attached Files
  • bugdemo.zip (129,127 bytes)
  • simpleipc.pp.patch (28,214 bytes)
    Index: packages/fcl-process/src/simpleipc.pp
    ===================================================================
    --- packages/fcl-process/src/simpleipc.pp	(revision 36715)
    +++ packages/fcl-process/src/simpleipc.pp	(working copy)
    @@ -20,13 +20,12 @@
     interface
     
     uses
    -  Contnrs, Classes, SysUtils;
    +  Contnrs, SyncObjs, Classes, SysUtils;
     
    -Const
    +const
       MsgVersion = 1;
    -  DefaultThreadTimeOut = 50;
     
    -  //Message types
    +  { IPC message types }
       mtUnknown = 0;
       mtString = 1;
     
    @@ -33,13 +32,8 @@
     type
       TIPCMessageOverflowAction = (ipcmoaNone, ipcmoaDiscardOld, ipcmoaDiscardNew, ipcmoaError);
     
    -var
    -  DefaultIPCMessageOverflowAction: TIPCMessageOverflowAction = ipcmoaNone;
    -  DefaultIPCMessageQueueLimit: Integer = 0;
    +  TMessageType = LongInt;
     
    -Type
    -
    -  TMessageType = LongInt;
       TMsgHeader = Packed record
         Version : Byte;
         MsgType : TMessageType;
    @@ -49,17 +43,29 @@
       TSimpleIPCServer = class;
       TSimpleIPCClient = class;
     
    +  { TIPCServerMsg }
       TIPCServerMsg = class
    +  private type
    +    TStreamClass = class of TStream;
    +  private const
    +    // TMemoryStream uses an effecient grow algorithm.
    +    DefaultStreamClass: TStreamClass = TMemoryStream;
       strict private
         FStream: TStream;
    +    FOwnsStream: Boolean;
         FMsgType: TMessageType;
    +    function GetStringMessage: String;
       public
         constructor Create;
    +    constructor Create(AStream: TStream; AOwnsStream: Boolean = True);
         destructor Destroy; override;
         property Stream: TStream read FStream;
         property MsgType: TMessageType read FMsgType write FMsgType;
    +    property OwnsStream: Boolean read FOwnsStream write FOwnsStream;
    +    property StringMessage: String read GetStringMessage;
       end;
     
    +  { TIPCServerMsgQueue }
       TIPCServerMsgQueue = class
       strict private
         FList: TFPObjectList;
    @@ -80,7 +86,6 @@
       end;
     
       { TIPCServerComm }
    -  
       TIPCServerComm = Class(TObject)
       Private
         FOwner  : TSimpleIPCServer;
    @@ -111,6 +116,7 @@
         FBusy: Boolean;
         FActive : Boolean;
         FServerID : String;
    +    procedure PrepareServerID;
         Procedure DoError(const Msg: String; const Args: array of const);
         Procedure CheckInactive;
         Procedure CheckActive;
    @@ -123,26 +129,42 @@
         Property ServerID : String Read FServerID Write SetServerID;
       end;
     
    +  TMessageQueueEvent = Procedure(Sender: TObject; Msg: TIPCServerMsg) of object;
    +
       { TSimpleIPCServer }
    -
    -  TMessageQueueEvent = Procedure(Sender : TObject; Msg : TIPCServerMsg) of object;
    -
       TSimpleIPCServer = Class(TSimpleIPC)
    -  protected
    -  Private
    +  private const
    +    DefaultThreaded = False;
    +    DefaultThreadTimeout = 50;
    +    DefaultSynchronizeEvents = True;
    +    DefaultMaxAction = ipcmoaNone;
    +    DefaultMaxQueue = 0;
    +  private
         FOnMessageError: TMessageQueueEvent;
         FOnMessageQueued: TNotifyEvent;
    -    FQueue : TIPCServerMsgQueue;
    +    FOnMessage: TNotifyEvent;
    +    FOnThreadError: TNotifyEvent;
    +    FQueue: TIPCServerMsgQueue;
         FGlobal: Boolean;
    -    FOnMessage: TNotifyEvent;
    -    FMsgType: TMessageType;
    -    FMsgData : TStream;
    -    FThreadTimeOut: Integer;
    -    FThread : TThread;
    -    FLock : TRTLCriticalSection;
    -    FErrMsg : TIPCServerMsg;
    -    procedure DoMessageQueued;
    -    procedure DoMessageError;
    +    // Access to the message is not locked by design, because in the threaded
    +    // mode it should be accessed only during synchronous event callbacks.
    +    FMessage: TIPCServerMsg;
    +    FTempMessage: TIPCServerMsg;
    +    FThreaded: Boolean;
    +    FThreadTimeout: Integer;
    +    FThreadError: String;
    +    FThreadExecuting: Boolean;
    +    FThreadReadyEvent: TSimpleEvent;
    +    FThread: TThread;
    +    FSynchronizeEvents: Boolean;
    +    procedure DoOnMessage;
    +    procedure DoOnMessageQueued;
    +    procedure DoOnMessageError(Msg: TIPCServerMsg);
    +    procedure DoOnThreadError;
    +    procedure InternalDoOnMessage;
    +    procedure InternalDoOnMessageQueued;
    +    procedure InternalDoOnMessageError;
    +    procedure InternalDoOnThreadError;
         function GetInstanceID: String;
         function GetMaxAction: TIPCMessageOverflowAction;
         function GetMaxQueue: Integer;
    @@ -150,13 +172,23 @@
         procedure SetGlobal(const AValue: Boolean);
         procedure SetMaxAction(AValue: TIPCMessageOverflowAction);
         procedure SetMaxQueue(AValue: Integer);
    -  Protected
    +    procedure SetThreaded(AValue: Boolean);
    +    procedure SetThreadTimeout(AValue: Integer);
    +    procedure SetSynchronizeEvents(AValue: Boolean);
    +    procedure CheckThreadContext;
    +    function WaitForReady(Timeout: Cardinal): Boolean;
    +    function WaitForReady: Boolean;
    +    function GetMsgType: TMessageType;
    +    function GetMsgData: TStream;
    +  protected
         FIPCComm: TIPCServerComm;
    -    procedure StartThread; virtual;
    -    procedure StopThread; virtual;
         Function CommClass : TIPCServerCommClass; virtual;
         Procedure PushMessage(Msg : TIPCServerMsg); virtual;
         function PopMessage: Boolean; virtual;
    +    procedure StartComm; virtual;
    +    procedure StopComm; virtual;
    +    function StartThread: Boolean; virtual;
    +    procedure StopThread; virtual;
         Procedure Activate; override;
         Procedure Deactivate; override;
         Property Queue : TIPCServerMsgQueue Read FQueue;
    @@ -164,17 +196,20 @@
       Public
         Constructor Create(AOwner : TComponent); override;
         Destructor Destroy; override;
    -    Procedure StartServer(Threaded : Boolean = False);
    +    Procedure StartServer;
    +    Procedure StartServer(AThreaded: Boolean);
         Procedure StopServer;
         Function PeekMessage(TimeOut : Integer; DoReadMessage : Boolean): Boolean;
         Procedure ReadMessage;
         Property  StringMessage : String Read GetStringMessage;
         Procedure GetMessageData(Stream : TStream);
    -    Property  MsgType: TMessageType Read FMsgType;
    -    Property  MsgData : TStream Read FMsgData;
    +    Property  Message: TIPCServerMsg read FMessage;
    +    Property  MsgType: TMessageType Read GetMsgType;
    +    Property  MsgData: TStream Read GetMsgData;
         Property  InstanceID : String Read GetInstanceID;
    +    property  ThreadExecuting: Boolean read FThreadExecuting;
    +    property  ThreadError: String read FThreadError;
       Published
    -    Property ThreadTimeOut : Integer Read FThreadTimeOut Write FThreadTimeOut;
         Property Global : Boolean Read FGlobal Write SetGlobal;
         // Called during ReadMessage
         Property OnMessage : TNotifyEvent Read FOnMessage Write FOnMessage;
    @@ -182,14 +217,21 @@
         Property OnMessageQueued : TNotifyEvent Read FOnMessageQueued Write FOnMessageQueued;
         // Called when the queue overflows and  MaxAction = ipcmoaError.
         Property OnMessageError : TMessageQueueEvent Read FOnMessageError Write FOnMessageError;
    +    // Called when the server thread catches an exception.
    +    property OnThreadError: TNotifyEvent read FOnThreadError write FOnThreadError;
         // Maximum number of messages to keep in the queue
    -    property MaxQueue: Integer read GetMaxQueue write SetMaxQueue;
    +    property MaxQueue: Integer read GetMaxQueue write SetMaxQueue default DefaultMaxQueue;
         // What to do when the queue overflows
    -    property MaxAction: TIPCMessageOverflowAction read GetMaxAction write SetMaxAction;
    +    property MaxAction: TIPCMessageOverflowAction read GetMaxAction write SetMaxAction default DefaultMaxAction;
    +    // Instruct IPC server to operate in a threaded mode.
    +    property Threaded: Boolean read FThreaded write SetThreaded;
    +    // Amount of time thread waits for a message before checking for termination.
    +    property ThreadTimeout: Integer read FThreadTimeout write SetThreadTimeout default DefaultThreadTimeout;
    +    // Synchronize events with the main thread when in threaded mode.
    +    property SynchronizeEvents: Boolean read FSynchronizeEvents write SetSynchronizeEvents default DefaultSynchronizeEvents;
       end;
     
    -
    -  { TIPCClientComm}
    +  { TIPCClientComm }
       TIPCClientComm = Class(TObject)
       private
         FOwner: TSimpleIPCClient;
    @@ -229,17 +271,23 @@
         Property  ServerInstance : String Read FServerInstance Write SetServerInstance;
       end;
     
    -
       EIPCError = Class(Exception);
     
    -Var
    +var
       DefaultIPCServerClass : TIPCServerCommClass = Nil;
       DefaultIPCClientClass : TIPCClientCommClass = Nil;
     
    +var
    +  DefaultIPCMessageOverflowAction: TIPCMessageOverflowAction = TSimpleIPCServer.DefaultMaxAction;
    +  DefaultIPCMessageQueueLimit: Integer = TSimpleIPCServer.DefaultMaxQueue;
    +
     resourcestring
       SErrServerNotActive = 'Server with ID %s is not active.';
       SErrActive = 'This operation is illegal when the server is active.';
       SErrInActive = 'This operation is illegal when the server is inactive.';
    +  SErrThreadContext = 'This operation is illegal outside of IPC thread context.';
    +  SErrThreadFailure = 'IPC thread failure.';
    +  SErrMessageQueueOverflow = 'Message queue overflow (limit %s)';
     
     
     implementation
    @@ -252,35 +300,82 @@
       This comes first, to allow the uses clause to be set.
       If the include file defines OSNEEDIPCINITDONE then the unit will
       call IPCInit and IPCDone in the initialization/finalization code.
    -  
       --------------------------------------------------------------------- }
     {$UNDEF OSNEEDIPCINITDONE}
     
     {$i simpleipc.inc}
     
    -Resourcestring
    -  SErrMessageQueueOverflow = 'Message queue overflow (limit %s)';
    +// Convert content of any stream type to a string.
    +function FastStreamToString(Stream: TStream): String;
    +var
    +  CharCount, CharSize: Integer;
    +  StringStream: TStringStream;
    +  OldPosition: Int64;
    +begin
    +  // Optimized for TStringStream
    +  if Stream is TStringStream then
    +  begin
    +    Result := TStringStream(Stream).DataString;
    +  end
    +  // Optimized for TCustomMemoryStream
    +  else if Stream is TCustomMemoryStream then
    +  begin
    +    Result := '';
    +    CharSize := StringElementSize(Result);
    +    CharCount := Stream.Size div CharSize;
    +    SetLength(Result, CharCount);
    +    Move(TCustomMemoryStream(Stream).Memory^, Result[1], CharCount * CharSize);
    +  end
    +  // Any other stream type
    +  else
    +  begin
    +    OldPosition := Stream.Position;
    +    try
    +      StringStream := TStringStream.Create('');
    +      try
    +        Stream.Position := 0;
    +        StringStream.CopyFrom(Stream, Stream.Size);
    +        Result := StringStream.DataString;
    +      finally
    +        StringStream.Free;
    +      end;
    +    finally
    +      Stream.Position := OldPosition;
    +    end;
    +  end;
    +end;
     
    -{ ---------------------------------------------------------------------
    -    TIPCServerMsg
    -  ---------------------------------------------------------------------}
    +{$REGION 'TIPCServerMsg'}
     
    -
     constructor TIPCServerMsg.Create;
     begin
    -  FMsgType := 0;
    -  FStream := TMemoryStream.Create;
    +  FMsgType := mtUnknown;
    +  FStream := Self.DefaultStreamClass.Create;
    +  FOwnsStream := True;
     end;
     
    +constructor TIPCServerMsg.Create(AStream: TStream; AOwnsStream: Boolean);
    +begin
    +  FMsgType := mtUnknown;
    +  FStream := AStream;
    +  FOwnsStream := AOwnsStream;
    +end;
    +
     destructor TIPCServerMsg.Destroy;
     begin
    -  FStream.Free;
    +  if FOwnsStream then
    +    FreeAndNil(FStream);
     end;
     
    -{ ---------------------------------------------------------------------
    -    TIPCServerMsgQueue
    -  ---------------------------------------------------------------------}
    +function TIPCServerMsg.GetStringMessage: String;
    +begin
    +  Result := FastStreamToString(FStream);
    +end;
     
    +{$ENDREGION}
    +
    +{$REGION 'TIPCServerMsgQueue'}
    +
     constructor TIPCServerMsgQueue.Create;
     begin
       FMaxCount := DefaultIPCMessageQueueLimit;
    @@ -336,6 +431,7 @@
     
     procedure TIPCServerMsgQueue.Push(AItem: TIPCServerMsg);
     begin
    +  // PrepareToPush may throw an exception, e.g. if message queue is full.
       if PrepareToPush then
         FList.Insert(0, AItem);
     end;
    @@ -355,10 +451,9 @@
         Result := nil;
     end;
     
    +{$ENDREGION}
     
    -{ ---------------------------------------------------------------------
    -    TIPCServerComm
    -  ---------------------------------------------------------------------}
    +{$REGION 'TIPCServerComm'}
     
     constructor TIPCServerComm.Create(AOwner: TSimpleIPCServer);
     begin
    @@ -366,16 +461,13 @@
     end;
     
     procedure TIPCServerComm.DoError(const Msg: String; const Args: array of const);
    -
     begin
       FOwner.DoError(Msg,Args);
     end;
     
     procedure TIPCServerComm.PushMessage(const Hdr: TMsgHeader; AStream: TStream);
    -
    -Var
    +var
       M : TIPCServerMsg;
    -
     begin
       M:=TIPCServerMsg.Create;
       try
    @@ -394,9 +486,9 @@
       FOwner.PushMessage(Msg);
     end;
     
    -{ ---------------------------------------------------------------------
    -    TIPCClientComm
    -  ---------------------------------------------------------------------}
    +{$ENDREGION}
    +
    +{$REGION 'TIPCClientComm'}
       
     constructor TIPCClientComm.Create(AOwner: TSimpleIPCClient);
     begin
    @@ -409,10 +501,10 @@
       FOwner.DoError(Msg,Args);
     end;  
     
    -{ ---------------------------------------------------------------------
    -    TSimpleIPC
    -  ---------------------------------------------------------------------}
    +{$ENDREGION}
     
    +{$REGION 'TSimpleIPC'}
    +
     Procedure TSimpleIPC.DoError(const Msg: String; const Args: array of const);
     var
       FullMsg: String;
    @@ -441,7 +533,7 @@
     procedure TSimpleIPC.SetActive(const AValue: Boolean);
     begin
       if (FActive<>AValue) then
    -    begin
    +  begin
         if ([]<>([csLoading,csDesigning]*ComponentState)) then
           FActive:=AValue
         else  
    @@ -449,37 +541,90 @@
             Activate
           else
             Deactivate;
    -    end;
    +  end;
     end;
     
     procedure TSimpleIPC.SetServerID(const AValue: String);
     begin
       if (FServerID<>AValue) then
    -    begin
    +  begin
         CheckInactive;
    -    FServerID:=AValue
    -    end;
    +    FServerID:=AValue;
    +  end;
     end;
     
    -Procedure TSimpleIPC.Loaded; 
    +procedure TSimpleIPC.PrepareServerID;
    +begin
    +  if FServerID = '' then
    +    FServerID := ApplicationName;
    +  // Extra precaution for thread-safety
    +  UniqueString(FServerID);
    +end;
     
    -Var
    +procedure TSimpleIPC.Loaded;
    +var
       B : Boolean;
    -
     begin
    -  Inherited;
    +  inherited;
       B:=FActive;
       if B then
    -    begin
    -    Factive:=False;
    +  begin
    +    FActive:=False;
         Activate;
    +  end;
    +end;
    +
    +{$ENDREGION}
    +
    +{$REGION 'TIPCServerThread'}
    +
    +type
    +  TIPCServerThread = class(TThread)
    +  private
    +    FServer: TSimpleIPCServer;
    +  protected
    +    procedure Execute; override;
    +  public
    +    constructor Create(AServer: TSimpleIPCServer);
    +    property Server: TSimpleIPCServer read FServer;
    +  end;
    +
    +constructor TIPCServerThread.Create(AServer: TSimpleIPCServer);
    +begin
    +  inherited Create(True); // CreateSuspended = True
    +  FServer := AServer;
    +end;
    +
    +procedure TIPCServerThread.Execute;
    +begin
    +  FServer.FThreadExecuting := True;
    +  try
    +    FServer.StartComm;
    +    try
    +      // Notify server that thread has started.
    +      FServer.FThreadReadyEvent.SetEvent;
    +      // Run message loop
    +      while not Terminated do
    +        FServer.PeekMessage(FServer.ThreadTimeout, True);
    +    finally
    +      FServer.StopComm;
         end;
    +  except on E: Exception do
    +    begin
    +      FServer.FThreadExecuting := False;
    +      FServer.FThreadError := E.Message;
    +      // Trigger event to wake up the caller from potentially indefinite wait.
    +      FServer.FThreadReadyEvent.SetEvent;
    +      FServer.DoOnThreadError;
    +    end;
    +  end;
    +  FServer.FThreadExecuting := False;
     end;
     
    -{ ---------------------------------------------------------------------
    -    TSimpleIPCServer
    -  ---------------------------------------------------------------------}
    +{$ENDREGION}
     
    +{$REGION 'TSimpleIPCServer'}
    +
     constructor TSimpleIPCServer.Create(AOwner: TComponent);
     begin
       inherited Create(AOwner);
    @@ -486,9 +631,11 @@
       FGlobal:=False;
       FActive:=False;
       FBusy:=False;
    -  FMsgData:=TStringStream.Create('');
    +  FMessage:=nil;
       FQueue:=TIPCServerMsgQueue.Create;
    -  FThreadTimeOut:=DefaultThreadTimeOut;
    +  FThreaded:=DefaultThreaded;
    +  FThreadTimeout:=DefaultThreadTimeout;
    +  FSynchronizeEvents:=DefaultSynchronizeEvents;
     end;
     
     destructor TSimpleIPCServer.Destroy;
    @@ -495,26 +642,44 @@
     begin
       Active:=False;
       FreeAndNil(FQueue);
    -  FreeAndNil(FMsgData);
    +  if Assigned(FMessage) then
    +    FreeAndNil(FMessage);
       inherited Destroy;
     end;
     
     procedure TSimpleIPCServer.SetGlobal(const AValue: Boolean);
     begin
    -  if (FGlobal<>AValue) then
    -    begin
    -    CheckInactive;
    -    FGlobal:=AValue;
    -    end;
    +  CheckInactive;
    +  FGlobal:=AValue;
     end;
     
    +procedure TSimpleIPCServer.SetThreaded(AValue: Boolean);
    +begin
    +  CheckInactive;
    +  FThreaded:=AValue;
    +end;
    +
    +procedure TSimpleIPCServer.SetThreadTimeout(AValue: Integer);
    +begin
    +  CheckInactive;
    +  FThreadTimeout:=AValue;
    +end;
    +
    +procedure TSimpleIPCServer.SetSynchronizeEvents(AValue: Boolean);
    +begin
    +  CheckInactive;
    +  FSynchronizeEvents:=AValue;
    +end;
    +
     procedure TSimpleIPCServer.SetMaxAction(AValue: TIPCMessageOverflowAction);
     begin
    +  CheckInactive;
       FQueue.MaxAction:=AValue;
     end;
     
     procedure TSimpleIPCServer.SetMaxQueue(AValue: Integer);
     begin
    +  CheckInactive;
       FQueue.MaxCount:=AValue;
     end;
     
    @@ -533,86 +698,98 @@
       Result:=FQueue.MaxCount;
     end;
     
    +procedure TSimpleIPCServer.CheckThreadContext;
    +begin
    +  // Check that the caller is in the IPC thread context.
    +  if Assigned(FThread) then
    +  begin
    +    if System.GetThreadID <> FThread.ThreadID then
    +      raise EIPCError.Create(SErrThreadContext);
    +  end;
    +end;
     
    -function TSimpleIPCServer.GetStringMessage: String;
    +procedure TSimpleIPCServer.StartComm;
     begin
    -  Result:=TStringStream(FMsgData).DataString;
    +  if Assigned(FIPCComm) then
    +    FreeAndNil(FIPCComm);
    +  FIPCComm := CommClass.Create(Self);
    +  FIPCComm.StartServer;
     end;
     
    +procedure TSimpleIPCServer.StopComm;
    +begin
    +  if Assigned(FIPCComm) then
    +  begin
    +    FIPCComm.StopServer;
    +    FreeAndNil(FIPCComm);
    +  end;
    +end;
     
    -procedure TSimpleIPCServer.StartServer(Threaded : Boolean = False);
    +function TSimpleIPCServer.StartThread: Boolean;
     begin
    -  if Not Assigned(FIPCComm) then
    -    begin
    -    If (FServerID='') then
    -      FServerID:=ApplicationName;
    -    FIPCComm:=CommClass.Create(Self);
    -    FIPCComm.StartServer;
    -    end;
    -  FActive:=True;
    -  If Threaded then
    -    StartThread;
    +  FThreadError := '';
    +  FThreadReadyEvent := SyncObjs.TSimpleEvent.Create;
    +  FThread := TIPCServerThread.Create(Self);
    +  FThread.Start;
    +  Result := WaitForReady;
     end;
     
    -Type
    -
    -  { TServerThread }
    -
    -  TServerThread = Class(TThread)
    -  private
    -    FServer: TSimpleIPCServer;
    -    FThreadTimeout: Integer;
    -  Public
    -    Constructor Create(AServer : TSimpleIPCServer; ATimeout : integer);
    -    procedure Execute; override;
    -    Property Server : TSimpleIPCServer Read FServer;
    -    Property ThreadTimeout : Integer Read FThreadTimeout;
    +procedure TSimpleIPCServer.StopThread;
    +begin
    +  if Assigned(FThread) then
    +  begin
    +    FThread.Terminate;
    +    FThread.WaitFor;
    +    FreeAndNil(FThread);
       end;
    +  if Assigned(FThreadReadyEvent) then
    +    FreeAndNil(FThreadReadyEvent);
    +end;
     
    -{ TServerThread }
    -
    -constructor TServerThread.Create(AServer: TSimpleIPCServer; ATimeout: integer);
    +function TSimpleIPCServer.WaitForReady(Timeout: Cardinal): Boolean;
     begin
    -  FServer:=AServer;
    -  FThreadTimeout:=ATimeOut;
    -  Inherited Create(False);
    +  if FThreadReadyEvent.WaitFor(Timeout) = wrSignaled then
    +    Result := FThreadExecuting
    +  else
    +    Result := False;
     end;
     
    -procedure TServerThread.Execute;
    +function TSimpleIPCServer.WaitForReady: Boolean;
     begin
    -  While Not Terminated do
    -    FServer.PeekMessage(ThreadTimeout,False);
    +  Result := WaitForReady(SyncObjs.INFINITE);
     end;
     
    -procedure TSimpleIPCServer.StartThread;
    -
    +procedure TSimpleIPCServer.StartServer;
     begin
    -  InitCriticalSection(FLock);
    -  FThread:=TServerThread.Create(Self,ThreadTimeOut);
    +  StartServer(FThreaded);
     end;
     
    -procedure TSimpleIPCServer.StopThread;
    -
    +procedure TSimpleIPCServer.StartServer(AThreaded: Boolean);
     begin
    -  if Assigned(FThread) then
    +  CheckInactive;
    +  FActive := True;
    +  try
    +    PrepareServerID;
    +    FThreaded := AThreaded;
    +    if FThreaded then
         begin
    -    FThread.Terminate;
    -    FThread.WaitFor;
    -    FreeAndNil(FThread);
    -    DoneCriticalSection(FLock);
    -    end;
    +      if not StartThread then
    +        raise EIPCError.Create(SErrThreadFailure);
    +    end
    +    else
    +      StartComm;
    +  except
    +    FActive := False;
    +    raise;
    +  end;
     end;
     
     procedure TSimpleIPCServer.StopServer;
     begin
       StopThread;
    -  If Assigned(FIPCComm) then
    -    begin
    -    FIPCComm.StopServer;
    -    FreeAndNil(FIPCComm);
    -    end;
    +  StopComm;
       FQueue.Clear;
    -  FActive:=False;
    +  FActive := False;
     end;
     
     // TimeOut values:
    @@ -622,10 +799,11 @@
     //   < -1  -- wait infinitely (force to -1)
     function TSimpleIPCServer.PeekMessage(TimeOut: Integer; DoReadMessage: Boolean): Boolean;
     begin
    +  CheckThreadContext;
       CheckActive;
       Result:=Queue.Count>0;
       If Not Result then
    -    begin
    +  begin
         if TimeOut < -1 then
           TimeOut := -1;
         FBusy:=True;
    @@ -634,7 +812,7 @@
         Finally
           FBusy:=False;
         end;
    -    end;
    +  end;
       If Result then
         If DoReadMessage then
           Readmessage;
    @@ -641,55 +819,83 @@
     end;
     
     function TSimpleIPCServer.PopMessage: Boolean;
    -
    -var
    -  MsgItem: TIPCServerMsg;
    -  DoLock : Boolean;
    -
     begin
    -  DoLock:=Assigned(FThread);
    -  if DoLock then
    -    EnterCriticalsection(Flock);
    -  try
    -    MsgItem:=FQueue.Pop;
    -  finally
    -    if DoLock then
    -      LeaveCriticalsection(FLock);
    -  end;
    -  Result:=Assigned(MsgItem);
    -  if Result then
    -    try
    -      FMsgType := MsgItem.MsgType;
    -      MsgItem.Stream.Position := 0;
    -      FMsgData.Size := 0;
    -      FMsgData.CopyFrom(MsgItem.Stream, MsgItem.Stream.Size);
    -    finally
    -      MsgItem.Free;
    -    end;
    +  CheckThreadContext;
    +  // Access to the message is not locked by design!
    +  if Assigned(FMessage) then
    +    FreeAndNil(FMessage);
    +  FMessage := FQueue.Pop;
    +  Result := Assigned(FMessage);
     end;
     
     procedure TSimpleIPCServer.ReadMessage;
    -
     begin
    +  CheckThreadContext;
       CheckActive;
       FBusy:=True;
       Try
         if (FQueue.Count=0) then
    -      // Readmessage pushes a message to the queue
    +      // ReadMessage pushes a message to the queue
           FIPCComm.ReadMessage;
         if PopMessage then
    -      If Assigned(FOnMessage) then
    -        FOnMessage(Self);
    +      DoOnMessage;
       Finally
         FBusy:=False;
       end;
     end;
     
    +procedure TSimpleIPCServer.PushMessage(Msg: TIPCServerMsg);
    +var
    +  Error: Boolean;
    +begin
    +  Error := False;
    +  try
    +    // Queue.Push may throw an exception, e.g. if message queue is full.
    +    Queue.Push(Msg);
    +  except
    +    Error := True;
    +  end;
    +  if Error then
    +    // Handler must free the Msg, because it is not owned by anybody.
    +    DoOnMessageError(Msg)
    +  else
    +    DoOnMessageQueued;
    +end;
    +
    +function TSimpleIPCServer.GetMsgType: TMessageType;
    +begin
    +  // Access to the message is not locked by design!
    +  if Assigned(FMessage) then
    +    Result := FMessage.MsgType
    +  else
    +    Result := mtUnknown;
    +end;
    +
    +function TSimpleIPCServer.GetMsgData: TStream;
    +begin
    +  // Access to the message is not locked by design!
    +  if Assigned(FMessage) then
    +    Result := FMessage.Stream
    +  else
    +    Result := nil;
    +end;
    +
     procedure TSimpleIPCServer.GetMessageData(Stream: TStream);
     begin
    -  Stream.CopyFrom(FMsgData,0);
    +  // Access to the message is not locked by design!
    +  if Assigned(FMessage) then
    +    Stream.CopyFrom(FMessage.Stream, 0);
     end;
     
    +function TSimpleIPCServer.GetStringMessage: String;
    +begin
    +  // Access to the message is not locked by design!
    +  if Assigned(FMessage) then
    +    Result := FMessage.StringMessage
    +  else
    +    Result := '';
    +end;
    +
     procedure TSimpleIPCServer.Activate;
     begin
       StartServer;
    @@ -700,62 +906,87 @@
       StopServer;
     end;
     
    +procedure TSimpleIPCServer.DoOnMessage;
    +begin
    +  if Assigned(FOnMessage) then
    +  begin
    +    if FSynchronizeEvents and Assigned(FThread) then
    +      TThread.Synchronize(FThread, @InternalDoOnMessage)
    +    else
    +      InternalDoOnMessage;
    +  end;
    +end;
     
    -procedure TSimpleIPCServer.DoMessageQueued;
    +procedure TSimpleIPCServer.InternalDoOnMessage;
    +begin
    +  if Assigned(FOnMessage) then
    +    FOnMessage(Self);
    +end;
     
    +procedure TSimpleIPCServer.DoOnMessageQueued;
     begin
       if Assigned(FOnMessageQueued) then
    +  begin
    +    if FSynchronizeEvents and Assigned(FThread) then
    +      TThread.Synchronize(FThread, @InternalDoOnMessageQueued)
    +    else
    +      InternalDoOnMessageQueued;
    +  end;
    +end;
    +
    +procedure TSimpleIPCServer.InternalDoOnMessageQueued;
    +begin
    +  if Assigned(FOnMessageQueued) then
         FOnMessageQueued(Self);
     end;
     
    -procedure TSimpleIPCServer.DoMessageError;
    +procedure TSimpleIPCServer.DoOnMessageError(Msg: TIPCServerMsg);
     begin
       try
    -    if Assigned(FOnMessageQueued) then
    -      FOnMessageError(Self,FErrMsg);
    +    if Assigned(FOnMessageError) then
    +    begin
    +      // Temp message (class instance variable) is used to pass
    +      // a parameter to a synchronized thread method.
    +      FTempMessage := Msg;
    +      if FSynchronizeEvents and Assigned(FThread) then
    +        TThread.Synchronize(FThread, @InternalDoOnMessageError)
    +      else
    +        InternalDoOnMessageError;
    +    end;
       finally
    -    FreeAndNil(FErrMsg)
    +    // Must free the message because it is not owned by anybody.
    +    FTempMessage := nil;
    +    FreeAndNil(Msg);
       end;
     end;
     
    -procedure TSimpleIPCServer.PushMessage(Msg: TIPCServerMsg);
    +procedure TSimpleIPCServer.InternalDoOnMessageError;
    +begin
    +  if Assigned(FOnMessageError) then
    +    FOnMessageError(Self, FTempMessage);
    +end;
     
    -Var
    -  DoLock : Boolean;
    -
    +procedure TSimpleIPCServer.DoOnThreadError;
     begin
    -  try
    -    DoLock:=Assigned(FThread);
    -    If DoLock then
    -      EnterCriticalsection(FLock);
    -    try
    -      Queue.Push(Msg);
    -    finally
    -      If DoLock then
    -        LeaveCriticalsection(FLock);
    -    end;
    -    if DoLock then
    -      TThread.Synchronize(FThread,@DoMessageQueued)
    +  if Assigned(FOnThreadError) then
    +  begin
    +    if FSynchronizeEvents and Assigned(FThread) then
    +      TThread.Synchronize(FThread, @InternalDoOnThreadError)
         else
    -      DoMessageQueued;
    -  except
    -    On E : Exception do
    -      FErrMsg:=Msg;
    +      InternalDoOnThreadError;
       end;
    -  if Assigned(FErrMsg) then
    -    if DoLock then
    -      TThread.Synchronize(FThread,@DoMessageError)
    -    else
    -      DoMessageQueued;
    +end;
     
    +procedure TSimpleIPCServer.InternalDoOnThreadError;
    +begin
    +  if Assigned(FOnThreadError) then
    +    FOnThreadError(Self);
     end;
     
    +{$ENDREGION}
     
    +{$REGION 'TSimpleIPCClient'}
     
    -{ ---------------------------------------------------------------------
    -    TSimpleIPCClient
    -  ---------------------------------------------------------------------}
    -
     procedure TSimpleIPCClient.SetServerInstance(const AValue: String);
     begin
       CheckInactive;
    @@ -776,7 +1007,7 @@
       inherited Create(AOwner);
     end;
     
    -destructor TSimpleIPCClient.destroy;
    +destructor TSimpleIPCClient.Destroy;
     begin
       Active:=False;
       Inherited;
    @@ -785,7 +1016,8 @@
     procedure TSimpleIPCClient.Connect;
     begin
       If Not assigned(FIPCComm) then
    -    begin
    +  begin
    +    PrepareServerID;
         FIPCComm:=CommClass.Create(Self);
         Try
           FIPCComm.Connect;
    @@ -794,7 +1026,7 @@
           Raise;
         end;  
         FActive:=True;
    -    end;
    +  end;
     end;
     
     procedure TSimpleIPCClient.Disconnect;
    @@ -809,21 +1041,24 @@
     end;
     
     function TSimpleIPCClient.ServerRunning: Boolean;
    -
    +var
    +  TempComm: TIPCClientComm;
     begin
       If Assigned(FIPCComm) then
         Result:=FIPCComm.ServerRunning
       else
    -    With CommClass.Create(Self) do
    -      Try
    -        Result:=ServerRunning;
    -      finally
    -        Free;
    -      end;
    +  begin
    +    PrepareServerID;
    +    TempComm := CommClass.Create(Self);
    +    Try
    +      Result := TempComm.ServerRunning;
    +    finally
    +      TempComm.Free;
    +    end;
    +  end;
     end;
     
     procedure TSimpleIPCClient.SendMessage(MsgType : TMessageType; Stream: TStream);
    -
     begin
       CheckActive;
       FBusy:=True;
    @@ -839,8 +1074,7 @@
       SendStringMessage(mtString,Msg);
     end;
     
    -procedure TSimpleIPCClient.SendStringMessage(MsgType: TMessageType; const Msg: String
    -  );
    +procedure TSimpleIPCClient.SendStringMessage(MsgType: TMessageType; const Msg: String);
     Var
       S : TStringStream;
     begin
    @@ -864,11 +1098,14 @@
       SendStringMessage(MsgType, Format(Msg,Args));
     end;
     
    +{$ENDREGION}
    +
     {$IFDEF OSNEEDIPCINITDONE}
     initialization
       IPCInit;
     finalization
       IPCDone;
    -{$ENDIF}  
    +{$ENDIF}
    +
     end.
     
    
    simpleipc.pp.patch (28,214 bytes)
  • ThreadedIPC.zip (97,398 bytes)
  • ThreadedIPCGUI.zip (861,345 bytes)
  • 2017-08-10-simpleipc.patch (31,427 bytes)
    Index: packages/fcl-process/src/simpleipc.pp
    ===================================================================
    --- packages/fcl-process/src/simpleipc.pp	(revision 36715)
    +++ packages/fcl-process/src/simpleipc.pp	(working copy)
    @@ -20,13 +20,12 @@
     interface
     
     uses
    -  Contnrs, Classes, SysUtils;
    +  Contnrs, SyncObjs, Classes, SysUtils;
     
    -Const
    +const
       MsgVersion = 1;
    -  DefaultThreadTimeOut = 50;
     
    -  //Message types
    +  { IPC message types }
       mtUnknown = 0;
       mtString = 1;
     
    @@ -33,13 +32,8 @@
     type
       TIPCMessageOverflowAction = (ipcmoaNone, ipcmoaDiscardOld, ipcmoaDiscardNew, ipcmoaError);
     
    -var
    -  DefaultIPCMessageOverflowAction: TIPCMessageOverflowAction = ipcmoaNone;
    -  DefaultIPCMessageQueueLimit: Integer = 0;
    +  TMessageType = LongInt;
     
    -Type
    -
    -  TMessageType = LongInt;
       TMsgHeader = Packed record
         Version : Byte;
         MsgType : TMessageType;
    @@ -49,17 +43,29 @@
       TSimpleIPCServer = class;
       TSimpleIPCClient = class;
     
    +  { TIPCServerMsg }
       TIPCServerMsg = class
    +  private type
    +    TStreamClass = class of TStream;
    +  private const
    +    // TMemoryStream uses an effecient grow algorithm.
    +    DefaultStreamClass: TStreamClass = TMemoryStream;
       strict private
         FStream: TStream;
    +    FOwnsStream: Boolean;
         FMsgType: TMessageType;
    +    function GetStringMessage: String;
       public
         constructor Create;
    +    constructor Create(AStream: TStream; AOwnsStream: Boolean = True);
         destructor Destroy; override;
         property Stream: TStream read FStream;
         property MsgType: TMessageType read FMsgType write FMsgType;
    +    property OwnsStream: Boolean read FOwnsStream write FOwnsStream;
    +    property StringMessage: String read GetStringMessage;
       end;
     
    +  { TIPCServerMsgQueue }
       TIPCServerMsgQueue = class
       strict private
         FList: TFPObjectList;
    @@ -80,7 +86,6 @@
       end;
     
       { TIPCServerComm }
    -  
       TIPCServerComm = Class(TObject)
       Private
         FOwner  : TSimpleIPCServer;
    @@ -94,10 +99,10 @@
         Property Owner : TSimpleIPCServer read FOwner;
         Procedure StartServer; virtual; Abstract;
         Procedure StopServer;virtual; Abstract;
    -    // May push messages on the queue
    -    Function  PeekMessage(TimeOut : Integer) : Boolean;virtual; Abstract;
    -    // Must put message on the queue.
    -    Procedure ReadMessage ;virtual; Abstract;
    +    // Check for new messages, may read and push messages to the queue.
    +    Function PeekMessage(Timeout: Integer): Boolean; virtual; Abstract;
    +    // Read and push new message to the queue, if not done by PeekMessage.
    +    Procedure ReadMessage; virtual; Abstract;
         Property InstanceID : String read GetInstanceID;
       end;
       TIPCServerCommClass = Class of TIPCServerComm;
    @@ -111,6 +116,7 @@
         FBusy: Boolean;
         FActive : Boolean;
         FServerID : String;
    +    procedure PrepareServerID;
         Procedure DoError(const Msg: String; const Args: array of const);
         Procedure CheckInactive;
         Procedure CheckActive;
    @@ -123,26 +129,44 @@
         Property ServerID : String Read FServerID Write SetServerID;
       end;
     
    +  TMessageQueueEvent = Procedure(Sender: TObject; Msg: TIPCServerMsg) of object;
    +
       { TSimpleIPCServer }
    -
    -  TMessageQueueEvent = Procedure(Sender : TObject; Msg : TIPCServerMsg) of object;
    -
       TSimpleIPCServer = Class(TSimpleIPC)
    -  protected
    -  Private
    +  private const
    +    DefaultThreaded = False;
    +    DefaultThreadTimeout = 50;
    +    DefaultSynchronizeEvents = True;
    +    DefaultMaxAction = ipcmoaNone;
    +    DefaultMaxQueue = 0;
    +  private
         FOnMessageError: TMessageQueueEvent;
         FOnMessageQueued: TNotifyEvent;
    -    FQueue : TIPCServerMsgQueue;
    +    FOnMessage: TNotifyEvent;
    +    FOnThreadError: TNotifyEvent;
    +    FQueue: TIPCServerMsgQueue;
    +    FQueueLock: TCriticalSection;
    +    FQueueAddEvent: TSimpleEvent;
         FGlobal: Boolean;
    -    FOnMessage: TNotifyEvent;
    -    FMsgType: TMessageType;
    -    FMsgData : TStream;
    -    FThreadTimeOut: Integer;
    -    FThread : TThread;
    -    FLock : TRTLCriticalSection;
    -    FErrMsg : TIPCServerMsg;
    -    procedure DoMessageQueued;
    -    procedure DoMessageError;
    +    // Access to the message is not locked by design!
    +    // In the threaded mode, it must be accessed only during event callbacks.
    +    FMessage: TIPCServerMsg;
    +    FTempMessage: TIPCServerMsg;
    +    FThreaded: Boolean;
    +    FThreadTimeout: Integer;
    +    FThreadError: String;
    +    FThreadExecuting: Boolean;
    +    FThreadReadyEvent: TSimpleEvent;
    +    FThread: TThread;
    +    FSynchronizeEvents: Boolean;
    +    procedure DoOnMessage;
    +    procedure DoOnMessageQueued;
    +    procedure DoOnMessageError(Msg: TIPCServerMsg);
    +    procedure DoOnThreadError;
    +    procedure InternalDoOnMessage;
    +    procedure InternalDoOnMessageQueued;
    +    procedure InternalDoOnMessageError;
    +    procedure InternalDoOnThreadError;
         function GetInstanceID: String;
         function GetMaxAction: TIPCMessageOverflowAction;
         function GetMaxQueue: Integer;
    @@ -150,31 +174,43 @@
         procedure SetGlobal(const AValue: Boolean);
         procedure SetMaxAction(AValue: TIPCMessageOverflowAction);
         procedure SetMaxQueue(AValue: Integer);
    -  Protected
    +    procedure SetThreaded(AValue: Boolean);
    +    procedure SetThreadTimeout(AValue: Integer);
    +    procedure SetSynchronizeEvents(AValue: Boolean);
    +    function WaitForReady(Timeout: Integer = -1): Boolean;
    +    function GetMsgType: TMessageType;
    +    function GetMsgData: TStream;
    +  protected
         FIPCComm: TIPCServerComm;
    -    procedure StartThread; virtual;
    -    procedure StopThread; virtual;
         Function CommClass : TIPCServerCommClass; virtual;
         Procedure PushMessage(Msg : TIPCServerMsg); virtual;
         function PopMessage: Boolean; virtual;
    +    procedure StartComm; virtual;
    +    procedure StopComm; virtual;
    +    function StartThread: Boolean; virtual;
    +    procedure StopThread; virtual;
         Procedure Activate; override;
         Procedure Deactivate; override;
    +    function ProcessMessage(Timeout: Integer): Boolean;
         Property Queue : TIPCServerMsgQueue Read FQueue;
         Property Thread : TThread Read FThread;
       Public
         Constructor Create(AOwner : TComponent); override;
         Destructor Destroy; override;
    -    Procedure StartServer(Threaded : Boolean = False);
    +    Procedure StartServer;
    +    Procedure StartServer(AThreaded: Boolean);
         Procedure StopServer;
    -    Function PeekMessage(TimeOut : Integer; DoReadMessage : Boolean): Boolean;
    -    Procedure ReadMessage;
    +    Function PeekMessage(Timeout: Integer; DoReadMessage: Boolean): Boolean;
    +    Function ReadMessage: Boolean;
         Property  StringMessage : String Read GetStringMessage;
         Procedure GetMessageData(Stream : TStream);
    -    Property  MsgType: TMessageType Read FMsgType;
    -    Property  MsgData : TStream Read FMsgData;
    +    Property  Message: TIPCServerMsg read FMessage;
    +    Property  MsgType: TMessageType Read GetMsgType;
    +    Property  MsgData: TStream Read GetMsgData;
         Property  InstanceID : String Read GetInstanceID;
    +    property  ThreadExecuting: Boolean read FThreadExecuting;
    +    property  ThreadError: String read FThreadError;
       Published
    -    Property ThreadTimeOut : Integer Read FThreadTimeOut Write FThreadTimeOut;
         Property Global : Boolean Read FGlobal Write SetGlobal;
         // Called during ReadMessage
         Property OnMessage : TNotifyEvent Read FOnMessage Write FOnMessage;
    @@ -182,14 +218,21 @@
         Property OnMessageQueued : TNotifyEvent Read FOnMessageQueued Write FOnMessageQueued;
         // Called when the queue overflows and  MaxAction = ipcmoaError.
         Property OnMessageError : TMessageQueueEvent Read FOnMessageError Write FOnMessageError;
    +    // Called when the server thread catches an exception.
    +    property OnThreadError: TNotifyEvent read FOnThreadError write FOnThreadError;
         // Maximum number of messages to keep in the queue
    -    property MaxQueue: Integer read GetMaxQueue write SetMaxQueue;
    +    property MaxQueue: Integer read GetMaxQueue write SetMaxQueue default DefaultMaxQueue;
         // What to do when the queue overflows
    -    property MaxAction: TIPCMessageOverflowAction read GetMaxAction write SetMaxAction;
    +    property MaxAction: TIPCMessageOverflowAction read GetMaxAction write SetMaxAction default DefaultMaxAction;
    +    // Instruct IPC server to operate in a threaded mode.
    +    property Threaded: Boolean read FThreaded write SetThreaded;
    +    // Amount of time thread waits for a message before checking for termination.
    +    property ThreadTimeout: Integer read FThreadTimeout write SetThreadTimeout default DefaultThreadTimeout;
    +    // Synchronize events with the main thread when in threaded mode.
    +    property SynchronizeEvents: Boolean read FSynchronizeEvents write SetSynchronizeEvents default DefaultSynchronizeEvents;
       end;
     
    -
    -  { TIPCClientComm}
    +  { TIPCClientComm }
       TIPCClientComm = Class(TObject)
       private
         FOwner: TSimpleIPCClient;
    @@ -229,17 +272,23 @@
         Property  ServerInstance : String Read FServerInstance Write SetServerInstance;
       end;
     
    -
       EIPCError = Class(Exception);
     
    -Var
    +var
       DefaultIPCServerClass : TIPCServerCommClass = Nil;
       DefaultIPCClientClass : TIPCClientCommClass = Nil;
     
    +var
    +  DefaultIPCMessageOverflowAction: TIPCMessageOverflowAction = TSimpleIPCServer.DefaultMaxAction;
    +  DefaultIPCMessageQueueLimit: Integer = TSimpleIPCServer.DefaultMaxQueue;
    +
     resourcestring
       SErrServerNotActive = 'Server with ID %s is not active.';
       SErrActive = 'This operation is illegal when the server is active.';
       SErrInActive = 'This operation is illegal when the server is inactive.';
    +  SErrThreadContext = 'This operation is illegal outside of IPC thread context.';
    +  SErrThreadFailure = 'IPC thread failure.';
    +  SErrMessageQueueOverflow = 'Message queue overflow (limit %s)';
     
     
     implementation
    @@ -252,35 +301,108 @@
       This comes first, to allow the uses clause to be set.
       If the include file defines OSNEEDIPCINITDONE then the unit will
       call IPCInit and IPCDone in the initialization/finalization code.
    -  
       --------------------------------------------------------------------- }
     {$UNDEF OSNEEDIPCINITDONE}
     
     {$i simpleipc.inc}
     
    -Resourcestring
    -  SErrMessageQueueOverflow = 'Message queue overflow (limit %s)';
    +// Convert content of any stream type to a string.
    +function FastStreamToString(Stream: TStream): String;
    +var
    +  CharCount, CharSize: Integer;
    +  StringStream: TStringStream;
    +  OldPosition: Int64;
    +begin
    +  // Optimized for TStringStream
    +  if Stream is TStringStream then
    +  begin
    +    Result := TStringStream(Stream).DataString;
    +  end
    +  // Optimized for TCustomMemoryStream
    +  else if Stream is TCustomMemoryStream then
    +  begin
    +    Result := '';
    +    CharSize := StringElementSize(Result);
    +    CharCount := Stream.Size div CharSize;
    +    SetLength(Result, CharCount);
    +    Move(TCustomMemoryStream(Stream).Memory^, Result[1], CharCount * CharSize);
    +  end
    +  // Any other stream type
    +  else
    +  begin
    +    OldPosition := Stream.Position;
    +    try
    +      StringStream := TStringStream.Create('');
    +      try
    +        Stream.Position := 0;
    +        StringStream.CopyFrom(Stream, Stream.Size);
    +        Result := StringStream.DataString;
    +      finally
    +        StringStream.Free;
    +      end;
    +    finally
    +      Stream.Position := OldPosition;
    +    end;
    +  end;
    +end;
     
    -{ ---------------------------------------------------------------------
    -    TIPCServerMsg
    -  ---------------------------------------------------------------------}
    +// Timeout values:
    +//   >  0  -- Number of milliseconds to wait
    +//   =  0  -- return immediately
    +//   = -1  -- wait infinitely (converted to INFINITE)
    +//   < -1  -- wait infinitely (converted to INFINITE)
    +function IPCTimeoutToEventTimeout(Timeout: Integer): Cardinal; inline;
    +begin
    +  if Timeout >= 0 then
    +    Result := Timeout
    +  else
    +    Result := SyncObjs.INFINITE;
    +end;
     
    +// Timeout values:
    +//   >  0  -- Number of milliseconds to wait
    +//   =  0  -- return immediately
    +//   = -1  -- wait infinitely
    +//   < -1  -- wait infinitely (force to -1)
    +function IPCTimeoutSanitized(Timeout: Integer): Integer; inline;
    +begin
    +  if Timeout >= 0 then
    +    Result := Timeout
    +  else
    +    Result := -1;
    +end;
     
    +{$REGION 'TIPCServerMsg'}
    +
     constructor TIPCServerMsg.Create;
     begin
    -  FMsgType := 0;
    -  FStream := TMemoryStream.Create;
    +  FMsgType := mtUnknown;
    +  FStream := Self.DefaultStreamClass.Create;
    +  FOwnsStream := True;
     end;
     
    +constructor TIPCServerMsg.Create(AStream: TStream; AOwnsStream: Boolean);
    +begin
    +  FMsgType := mtUnknown;
    +  FStream := AStream;
    +  FOwnsStream := AOwnsStream;
    +end;
    +
     destructor TIPCServerMsg.Destroy;
     begin
    -  FStream.Free;
    +  if FOwnsStream then
    +    FreeAndNil(FStream);
     end;
     
    -{ ---------------------------------------------------------------------
    -    TIPCServerMsgQueue
    -  ---------------------------------------------------------------------}
    +function TIPCServerMsg.GetStringMessage: String;
    +begin
    +  Result := FastStreamToString(FStream);
    +end;
     
    +{$ENDREGION}
    +
    +{$REGION 'TIPCServerMsgQueue'}
    +
     constructor TIPCServerMsgQueue.Create;
     begin
       FMaxCount := DefaultIPCMessageQueueLimit;
    @@ -336,6 +458,7 @@
     
     procedure TIPCServerMsgQueue.Push(AItem: TIPCServerMsg);
     begin
    +  // PrepareToPush may throw an exception, e.g. if message queue is full.
       if PrepareToPush then
         FList.Insert(0, AItem);
     end;
    @@ -355,10 +478,9 @@
         Result := nil;
     end;
     
    +{$ENDREGION}
     
    -{ ---------------------------------------------------------------------
    -    TIPCServerComm
    -  ---------------------------------------------------------------------}
    +{$REGION 'TIPCServerComm'}
     
     constructor TIPCServerComm.Create(AOwner: TSimpleIPCServer);
     begin
    @@ -366,16 +488,13 @@
     end;
     
     procedure TIPCServerComm.DoError(const Msg: String; const Args: array of const);
    -
     begin
       FOwner.DoError(Msg,Args);
     end;
     
     procedure TIPCServerComm.PushMessage(const Hdr: TMsgHeader; AStream: TStream);
    -
    -Var
    +var
       M : TIPCServerMsg;
    -
     begin
       M:=TIPCServerMsg.Create;
       try
    @@ -394,9 +513,9 @@
       FOwner.PushMessage(Msg);
     end;
     
    -{ ---------------------------------------------------------------------
    -    TIPCClientComm
    -  ---------------------------------------------------------------------}
    +{$ENDREGION}
    +
    +{$REGION 'TIPCClientComm'}
       
     constructor TIPCClientComm.Create(AOwner: TSimpleIPCClient);
     begin
    @@ -409,10 +528,10 @@
       FOwner.DoError(Msg,Args);
     end;  
     
    -{ ---------------------------------------------------------------------
    -    TSimpleIPC
    -  ---------------------------------------------------------------------}
    +{$ENDREGION}
     
    +{$REGION 'TSimpleIPC'}
    +
     Procedure TSimpleIPC.DoError(const Msg: String; const Args: array of const);
     var
       FullMsg: String;
    @@ -441,7 +560,7 @@
     procedure TSimpleIPC.SetActive(const AValue: Boolean);
     begin
       if (FActive<>AValue) then
    -    begin
    +  begin
         if ([]<>([csLoading,csDesigning]*ComponentState)) then
           FActive:=AValue
         else  
    @@ -449,37 +568,90 @@
             Activate
           else
             Deactivate;
    -    end;
    +  end;
     end;
     
     procedure TSimpleIPC.SetServerID(const AValue: String);
     begin
       if (FServerID<>AValue) then
    -    begin
    +  begin
         CheckInactive;
    -    FServerID:=AValue
    -    end;
    +    FServerID:=AValue;
    +  end;
     end;
     
    -Procedure TSimpleIPC.Loaded; 
    +procedure TSimpleIPC.PrepareServerID;
    +begin
    +  if FServerID = '' then
    +    FServerID := ApplicationName;
    +  // Extra precaution for thread-safety
    +  UniqueString(FServerID);
    +end;
     
    -Var
    +procedure TSimpleIPC.Loaded;
    +var
       B : Boolean;
    -
     begin
    -  Inherited;
    +  inherited;
       B:=FActive;
       if B then
    -    begin
    -    Factive:=False;
    +  begin
    +    FActive:=False;
         Activate;
    +  end;
    +end;
    +
    +{$ENDREGION}
    +
    +{$REGION 'TIPCServerThread'}
    +
    +type
    +  TIPCServerThread = class(TThread)
    +  private
    +    FServer: TSimpleIPCServer;
    +  protected
    +    procedure Execute; override;
    +  public
    +    constructor Create(AServer: TSimpleIPCServer);
    +    property Server: TSimpleIPCServer read FServer;
    +  end;
    +
    +constructor TIPCServerThread.Create(AServer: TSimpleIPCServer);
    +begin
    +  inherited Create(True); // CreateSuspended = True
    +  FServer := AServer;
    +end;
    +
    +procedure TIPCServerThread.Execute;
    +begin
    +  FServer.FThreadExecuting := True;
    +  try
    +    FServer.StartComm;
    +    try
    +      // Notify server that thread has started.
    +      FServer.FThreadReadyEvent.SetEvent;
    +      // Run message loop
    +      while not Terminated do
    +        FServer.ProcessMessage(FServer.ThreadTimeout);
    +    finally
    +      FServer.StopComm;
         end;
    +  except on E: Exception do
    +    begin
    +      FServer.FThreadExecuting := False;
    +      FServer.FThreadError := E.Message;
    +      // Trigger event to wake up the caller from potentially indefinite wait.
    +      FServer.FThreadReadyEvent.SetEvent;
    +      FServer.DoOnThreadError;
    +    end;
    +  end;
    +  FServer.FThreadExecuting := False;
     end;
     
    -{ ---------------------------------------------------------------------
    -    TSimpleIPCServer
    -  ---------------------------------------------------------------------}
    +{$ENDREGION}
     
    +{$REGION 'TSimpleIPCServer'}
    +
     constructor TSimpleIPCServer.Create(AOwner: TComponent);
     begin
       inherited Create(AOwner);
    @@ -486,9 +658,11 @@
       FGlobal:=False;
       FActive:=False;
       FBusy:=False;
    -  FMsgData:=TStringStream.Create('');
    +  FMessage:=nil;
       FQueue:=TIPCServerMsgQueue.Create;
    -  FThreadTimeOut:=DefaultThreadTimeOut;
    +  FThreaded:=DefaultThreaded;
    +  FThreadTimeout:=DefaultThreadTimeout;
    +  FSynchronizeEvents:=DefaultSynchronizeEvents;
     end;
     
     destructor TSimpleIPCServer.Destroy;
    @@ -495,26 +669,44 @@
     begin
       Active:=False;
       FreeAndNil(FQueue);
    -  FreeAndNil(FMsgData);
    +  if Assigned(FMessage) then
    +    FreeAndNil(FMessage);
       inherited Destroy;
     end;
     
     procedure TSimpleIPCServer.SetGlobal(const AValue: Boolean);
     begin
    -  if (FGlobal<>AValue) then
    -    begin
    -    CheckInactive;
    -    FGlobal:=AValue;
    -    end;
    +  CheckInactive;
    +  FGlobal:=AValue;
     end;
     
    +procedure TSimpleIPCServer.SetThreaded(AValue: Boolean);
    +begin
    +  CheckInactive;
    +  FThreaded:=AValue;
    +end;
    +
    +procedure TSimpleIPCServer.SetThreadTimeout(AValue: Integer);
    +begin
    +  CheckInactive;
    +  FThreadTimeout:=AValue;
    +end;
    +
    +procedure TSimpleIPCServer.SetSynchronizeEvents(AValue: Boolean);
    +begin
    +  CheckInactive;
    +  FSynchronizeEvents:=AValue;
    +end;
    +
     procedure TSimpleIPCServer.SetMaxAction(AValue: TIPCMessageOverflowAction);
     begin
    +  CheckInactive;
       FQueue.MaxAction:=AValue;
     end;
     
     procedure TSimpleIPCServer.SetMaxQueue(AValue: Integer);
     begin
    +  CheckInactive;
       FQueue.MaxCount:=AValue;
     end;
     
    @@ -533,163 +725,231 @@
       Result:=FQueue.MaxCount;
     end;
     
    +procedure TSimpleIPCServer.StartComm;
    +begin
    +  if Assigned(FIPCComm) then
    +    FreeAndNil(FIPCComm);
    +  FIPCComm := CommClass.Create(Self);
    +  FIPCComm.StartServer;
    +end;
     
    -function TSimpleIPCServer.GetStringMessage: String;
    +procedure TSimpleIPCServer.StopComm;
     begin
    -  Result:=TStringStream(FMsgData).DataString;
    +  if Assigned(FIPCComm) then
    +  begin
    +    FIPCComm.StopServer;
    +    FreeAndNil(FIPCComm);
    +  end;
     end;
     
    -
    -procedure TSimpleIPCServer.StartServer(Threaded : Boolean = False);
    +function TSimpleIPCServer.StartThread: Boolean;
     begin
    -  if Not Assigned(FIPCComm) then
    -    begin
    -    If (FServerID='') then
    -      FServerID:=ApplicationName;
    -    FIPCComm:=CommClass.Create(Self);
    -    FIPCComm.StartServer;
    -    end;
    -  FActive:=True;
    -  If Threaded then
    -    StartThread;
    +  FThreadError := '';
    +  FQueueLock := SyncObjs.TCriticalSection.Create;
    +  FQueueAddEvent := SyncObjs.TSimpleEvent.Create;
    +  FThreadReadyEvent := SyncObjs.TSimpleEvent.Create;
    +  FThread := TIPCServerThread.Create(Self);
    +  FThread.Start;
    +  Result := WaitForReady;
     end;
     
    -Type
    -
    -  { TServerThread }
    -
    -  TServerThread = Class(TThread)
    -  private
    -    FServer: TSimpleIPCServer;
    -    FThreadTimeout: Integer;
    -  Public
    -    Constructor Create(AServer : TSimpleIPCServer; ATimeout : integer);
    -    procedure Execute; override;
    -    Property Server : TSimpleIPCServer Read FServer;
    -    Property ThreadTimeout : Integer Read FThreadTimeout;
    +procedure TSimpleIPCServer.StopThread;
    +begin
    +  if Assigned(FThread) then
    +  begin
    +    FThread.Terminate;
    +    FThread.WaitFor;
    +    FreeAndNil(FThread);
       end;
    -
    -{ TServerThread }
    -
    -constructor TServerThread.Create(AServer: TSimpleIPCServer; ATimeout: integer);
    -begin
    -  FServer:=AServer;
    -  FThreadTimeout:=ATimeOut;
    -  Inherited Create(False);
    +  if Assigned(FThreadReadyEvent) then
    +    FreeAndNil(FThreadReadyEvent);
    +  if Assigned(FQueueAddEvent) then
    +    FreeAndNil(FQueueAddEvent);
    +  if Assigned(FQueueLock) then
    +    FreeAndNil(FQueueLock);
     end;
     
    -procedure TServerThread.Execute;
    +function TSimpleIPCServer.WaitForReady(Timeout: Integer = -1): Boolean;
     begin
    -  While Not Terminated do
    -    FServer.PeekMessage(ThreadTimeout,False);
    +  if FThreadReadyEvent.WaitFor(IPCTimeoutToEventTimeout(Timeout)) = wrSignaled then
    +    Result := FThreadExecuting
    +  else
    +    Result := False;
     end;
     
    -procedure TSimpleIPCServer.StartThread;
    -
    +procedure TSimpleIPCServer.StartServer;
     begin
    -  InitCriticalSection(FLock);
    -  FThread:=TServerThread.Create(Self,ThreadTimeOut);
    +  StartServer(FThreaded);
     end;
     
    -procedure TSimpleIPCServer.StopThread;
    -
    +procedure TSimpleIPCServer.StartServer(AThreaded: Boolean);
     begin
    -  if Assigned(FThread) then
    +  CheckInactive;
    +  FActive := True;
    +  try
    +    PrepareServerID;
    +    FThreaded := AThreaded;
    +    if FThreaded then
         begin
    -    FThread.Terminate;
    -    FThread.WaitFor;
    -    FreeAndNil(FThread);
    -    DoneCriticalSection(FLock);
    -    end;
    +      if not StartThread then
    +        raise EIPCError.Create(SErrThreadFailure);
    +    end
    +    else
    +      StartComm;
    +  except
    +    FActive := False;
    +    raise;
    +  end;
     end;
     
     procedure TSimpleIPCServer.StopServer;
     begin
       StopThread;
    -  If Assigned(FIPCComm) then
    -    begin
    -    FIPCComm.StopServer;
    -    FreeAndNil(FIPCComm);
    -    end;
    +  StopComm;
       FQueue.Clear;
    -  FActive:=False;
    +  FActive := False;
     end;
     
    -// TimeOut values:
    +function TSimpleIPCServer.ProcessMessage(Timeout: Integer): Boolean;
    +begin
    +  FBusy := True;
    +  try
    +    // Check for new messages (may push several messages to the queue)
    +    Result := FIPCComm.PeekMessage(IPCTimeoutSanitized(Timeout));
    +    // Push new message to the queue (explicitly)
    +    if Result then
    +      FIPCComm.ReadMessage;
    +  finally
    +    FBusy := False;
    +  end;
    +end;
    +
    +// Timeout values:
     //   >  0  -- Number of milliseconds to wait
     //   =  0  -- return immediately
     //   = -1  -- wait infinitely
     //   < -1  -- wait infinitely (force to -1)
    -function TSimpleIPCServer.PeekMessage(TimeOut: Integer; DoReadMessage: Boolean): Boolean;
    +function TSimpleIPCServer.PeekMessage(Timeout: Integer; DoReadMessage: Boolean): Boolean;
     begin
       CheckActive;
    -  Result:=Queue.Count>0;
    -  If Not Result then
    -    begin
    -    if TimeOut < -1 then
    -      TimeOut := -1;
    -    FBusy:=True;
    -    Try
    -      Result:=FIPCComm.PeekMessage(Timeout);
    -    Finally
    -      FBusy:=False;
    +
    +  if Threaded then
    +  begin
    +    // Check if have messages in the queue
    +    FQueueLock.Acquire;
    +    try
    +      Result:=FQueue.Count>0;
    +      // Reset queue add event
    +      if not Result then
    +        FQueueAddEvent.ResetEvent;
    +    finally
    +      FQueueLock.Release;
         end;
    -    end;
    +    // Wait for queue add event
    +    if not Result and (Timeout <> 0) then
    +      Result := FQueueAddEvent.WaitFor(IPCTimeoutToEventTimeout(Timeout)) = wrSignaled;
    +  end
    +  else
    +  begin
    +    // Check if have messages in the queue
    +    Result:=FQueue.Count>0;
    +    // If queue is empty, process new messages via IPC driver
    +    if not Result then
    +      Result := ProcessMessage(Timeout);
    +  end;
    +
    +  // Read message if available (be aware of a race condition in threaded mode)
       If Result then
         If DoReadMessage then
    -      Readmessage;
    +      ReadMessage;
     end;
     
    +function TSimpleIPCServer.ReadMessage: Boolean;
    +begin
    +  // Pop a message from the queue
    +  Result := PopMessage;
    +  if Result then
    +    DoOnMessage;
    +end;
    +
     function TSimpleIPCServer.PopMessage: Boolean;
    +begin
    +  if Threaded then
    +    FQueueLock.Acquire;
    +  try
    +    if Assigned(FMessage) then
    +      FreeAndNil(FMessage);
    +    FMessage := FQueue.Pop;
    +    Result := Assigned(FMessage);
    +  finally
    +    if Threaded then
    +      FQueueLock.Release;
    +  end;
    +end;
     
    +procedure TSimpleIPCServer.PushMessage(Msg: TIPCServerMsg);
     var
    -  MsgItem: TIPCServerMsg;
    -  DoLock : Boolean;
    -
    +  PushFailed: Boolean;
     begin
    -  DoLock:=Assigned(FThread);
    -  if DoLock then
    -    EnterCriticalsection(Flock);
    +  if Threaded then
    +    FQueueLock.Acquire;
       try
    -    MsgItem:=FQueue.Pop;
    +    PushFailed := False;
    +    try
    +      // Queue.Push may throw an exception, e.g. if message queue is full.
    +      FQueue.Push(Msg);
    +    except
    +      PushFailed := True;
    +    end;
    +    // Notify a waiting PeekMessage in threaded mode
    +    if Threaded and not PushFailed then
    +      FQueueAddEvent.SetEvent;
       finally
    -    if DoLock then
    -      LeaveCriticalsection(FLock);
    +    if Threaded then
    +      FQueueLock.Release;
       end;
    -  Result:=Assigned(MsgItem);
    -  if Result then
    -    try
    -      FMsgType := MsgItem.MsgType;
    -      MsgItem.Stream.Position := 0;
    -      FMsgData.Size := 0;
    -      FMsgData.CopyFrom(MsgItem.Stream, MsgItem.Stream.Size);
    -    finally
    -      MsgItem.Free;
    -    end;
    +
    +  if PushFailed then
    +    // Handler must free the Msg, because it is not owned by anybody.
    +    DoOnMessageError(Msg)
    +  else
    +    DoOnMessageQueued;
     end;
     
    -procedure TSimpleIPCServer.ReadMessage;
    +function TSimpleIPCServer.GetMsgType: TMessageType;
    +begin
    +  // Access to the message is not locked by design!
    +  if Assigned(FMessage) then
    +    Result := FMessage.MsgType
    +  else
    +    Result := mtUnknown;
    +end;
     
    +function TSimpleIPCServer.GetMsgData: TStream;
     begin
    -  CheckActive;
    -  FBusy:=True;
    -  Try
    -    if (FQueue.Count=0) then
    -      // Readmessage pushes a message to the queue
    -      FIPCComm.ReadMessage;
    -    if PopMessage then
    -      If Assigned(FOnMessage) then
    -        FOnMessage(Self);
    -  Finally
    -    FBusy:=False;
    -  end;
    +  // Access to the message is not locked by design!
    +  if Assigned(FMessage) then
    +    Result := FMessage.Stream
    +  else
    +    Result := nil;
     end;
     
     procedure TSimpleIPCServer.GetMessageData(Stream: TStream);
     begin
    -  Stream.CopyFrom(FMsgData,0);
    +  // Access to the message is not locked by design!
    +  if Assigned(FMessage) then
    +    Stream.CopyFrom(FMessage.Stream, 0);
     end;
     
    +function TSimpleIPCServer.GetStringMessage: String;
    +begin
    +  // Access to the message is not locked by design!
    +  if Assigned(FMessage) then
    +    Result := FMessage.StringMessage
    +  else
    +    Result := '';
    +end;
    +
     procedure TSimpleIPCServer.Activate;
     begin
       StartServer;
    @@ -700,62 +960,87 @@
       StopServer;
     end;
     
    +procedure TSimpleIPCServer.DoOnMessage;
    +begin
    +  if Assigned(FOnMessage) then
    +  begin
    +    if FSynchronizeEvents and Assigned(FThread) then
    +      TThread.Synchronize(FThread, @InternalDoOnMessage)
    +    else
    +      InternalDoOnMessage;
    +  end;
    +end;
     
    -procedure TSimpleIPCServer.DoMessageQueued;
    +procedure TSimpleIPCServer.InternalDoOnMessage;
    +begin
    +  if Assigned(FOnMessage) then
    +    FOnMessage(Self);
    +end;
     
    +procedure TSimpleIPCServer.DoOnMessageQueued;
     begin
       if Assigned(FOnMessageQueued) then
    +  begin
    +    if FSynchronizeEvents and Assigned(FThread) then
    +      TThread.Synchronize(FThread, @InternalDoOnMessageQueued)
    +    else
    +      InternalDoOnMessageQueued;
    +  end;
    +end;
    +
    +procedure TSimpleIPCServer.InternalDoOnMessageQueued;
    +begin
    +  if Assigned(FOnMessageQueued) then
         FOnMessageQueued(Self);
     end;
     
    -procedure TSimpleIPCServer.DoMessageError;
    +procedure TSimpleIPCServer.DoOnMessageError(Msg: TIPCServerMsg);
     begin
       try
    -    if Assigned(FOnMessageQueued) then
    -      FOnMessageError(Self,FErrMsg);
    +    if Assigned(FOnMessageError) then
    +    begin
    +      // Temp message (class instance variable) is used to pass
    +      // a parameter to a synchronized thread method.
    +      FTempMessage := Msg;
    +      if FSynchronizeEvents and Assigned(FThread) then
    +        TThread.Synchronize(FThread, @InternalDoOnMessageError)
    +      else
    +        InternalDoOnMessageError;
    +    end;
       finally
    -    FreeAndNil(FErrMsg)
    +    // Must free the message because it is not owned by anybody.
    +    FTempMessage := nil;
    +    FreeAndNil(Msg);
       end;
     end;
     
    -procedure TSimpleIPCServer.PushMessage(Msg: TIPCServerMsg);
    +procedure TSimpleIPCServer.InternalDoOnMessageError;
    +begin
    +  if Assigned(FOnMessageError) then
    +    FOnMessageError(Self, FTempMessage);
    +end;
     
    -Var
    -  DoLock : Boolean;
    -
    +procedure TSimpleIPCServer.DoOnThreadError;
     begin
    -  try
    -    DoLock:=Assigned(FThread);
    -    If DoLock then
    -      EnterCriticalsection(FLock);
    -    try
    -      Queue.Push(Msg);
    -    finally
    -      If DoLock then
    -        LeaveCriticalsection(FLock);
    -    end;
    -    if DoLock then
    -      TThread.Synchronize(FThread,@DoMessageQueued)
    +  if Assigned(FOnThreadError) then
    +  begin
    +    if FSynchronizeEvents and Assigned(FThread) then
    +      TThread.Synchronize(FThread, @InternalDoOnThreadError)
         else
    -      DoMessageQueued;
    -  except
    -    On E : Exception do
    -      FErrMsg:=Msg;
    +      InternalDoOnThreadError;
       end;
    -  if Assigned(FErrMsg) then
    -    if DoLock then
    -      TThread.Synchronize(FThread,@DoMessageError)
    -    else
    -      DoMessageQueued;
    +end;
     
    +procedure TSimpleIPCServer.InternalDoOnThreadError;
    +begin
    +  if Assigned(FOnThreadError) then
    +    FOnThreadError(Self);
     end;
     
    +{$ENDREGION}
     
    +{$REGION 'TSimpleIPCClient'}
     
    -{ ---------------------------------------------------------------------
    -    TSimpleIPCClient
    -  ---------------------------------------------------------------------}
    -
     procedure TSimpleIPCClient.SetServerInstance(const AValue: String);
     begin
       CheckInactive;
    @@ -776,7 +1061,7 @@
       inherited Create(AOwner);
     end;
     
    -destructor TSimpleIPCClient.destroy;
    +destructor TSimpleIPCClient.Destroy;
     begin
       Active:=False;
       Inherited;
    @@ -785,7 +1070,8 @@
     procedure TSimpleIPCClient.Connect;
     begin
       If Not assigned(FIPCComm) then
    -    begin
    +  begin
    +    PrepareServerID;
         FIPCComm:=CommClass.Create(Self);
         Try
           FIPCComm.Connect;
    @@ -794,7 +1080,7 @@
           Raise;
         end;  
         FActive:=True;
    -    end;
    +  end;
     end;
     
     procedure TSimpleIPCClient.Disconnect;
    @@ -809,21 +1095,24 @@
     end;
     
     function TSimpleIPCClient.ServerRunning: Boolean;
    -
    +var
    +  TempComm: TIPCClientComm;
     begin
       If Assigned(FIPCComm) then
         Result:=FIPCComm.ServerRunning
       else
    -    With CommClass.Create(Self) do
    -      Try
    -        Result:=ServerRunning;
    -      finally
    -        Free;
    -      end;
    +  begin
    +    PrepareServerID;
    +    TempComm := CommClass.Create(Self);
    +    Try
    +      Result := TempComm.ServerRunning;
    +    finally
    +      TempComm.Free;
    +    end;
    +  end;
     end;
     
     procedure TSimpleIPCClient.SendMessage(MsgType : TMessageType; Stream: TStream);
    -
     begin
       CheckActive;
       FBusy:=True;
    @@ -839,8 +1128,7 @@
       SendStringMessage(mtString,Msg);
     end;
     
    -procedure TSimpleIPCClient.SendStringMessage(MsgType: TMessageType; const Msg: String
    -  );
    +procedure TSimpleIPCClient.SendStringMessage(MsgType: TMessageType; const Msg: String);
     Var
       S : TStringStream;
     begin
    @@ -864,11 +1152,14 @@
       SendStringMessage(MsgType, Format(Msg,Args));
     end;
     
    +{$ENDREGION}
    +
     {$IFDEF OSNEEDIPCINITDONE}
     initialization
       IPCInit;
     finalization
       IPCDone;
    -{$ENDIF}  
    +{$ENDIF}
    +
     end.
     
    
  • 2017-08-10-ipcdrivers.patch (2,457 bytes)
    Index: packages/fcl-process/src/amicommon/simpleipc.inc
    ===================================================================
    --- packages/fcl-process/src/amicommon/simpleipc.inc	(revision 36715)
    +++ packages/fcl-process/src/amicommon/simpleipc.inc	(working copy)
    @@ -234,6 +234,7 @@
     var
       Temp: PByte;
       MsgType: TMessageType;
    +  Msg: TIPCServerMsg;
     begin
       if Assigned(MsgBody) then
       begin
    @@ -241,11 +242,18 @@
         Inc(Temp, SizeOf(Exec.TMessage));
         MsgType := 0;
         Move(Temp^, MsgType, SizeOf(TMessageType));
    -    Inc(Temp, SizeOf(TMessageType));    
    -    Owner.FMsgType := MsgType;
    -    Owner.FMsgData.Size := 0;
    -    Owner.FMsgData.Seek(0, soFrombeginning);
    -    Owner.FMsgData.WriteBuffer(temp^, MsgBody^.mn_Length);
    +    Inc(Temp, SizeOf(TMessageType));
    +
    +    Msg := TIPCServerMsg.Create;
    +    try
    +      Msg.MsgType := MsgType;
    +      Msg.Stream.WriteBuffer(Temp^, MsgBody^.mn_Length);
    +    except
    +      FreeAndNil(Msg);
    +      raise;
    +    end;
    +    PushMessage(Msg);
    +
         System.FreeMem(MsgBody);
         MsgBody := nil;
       end;
    Index: packages/fcl-process/src/unix/simpleipc.inc
    ===================================================================
    --- packages/fcl-process/src/unix/simpleipc.inc	(revision 36715)
    +++ packages/fcl-process/src/unix/simpleipc.inc	(working copy)
    @@ -131,8 +131,6 @@
       Private
         FFileName: String;
         FStream: TFileStream;
    -  Protected
    -    Procedure DoReadMessage; virtual;
       Public
         Constructor Create(AOWner : TSimpleIPCServer); override;
         Procedure StartServer; override;
    @@ -144,16 +142,7 @@
         Property Stream : TFileStream Read FStream;
       end;
     
    -procedure TPipeServerComm.DoReadMessage;
     
    -Var
    -  Hdr : TMsgHeader;
    -
    -begin
    -  FStream.ReadBuffer(Hdr,SizeOf(Hdr));
    -  PushMessage(Hdr,FStream);
    -end;
    -
     constructor TPipeServerComm.Create(AOWner: TSimpleIPCServer);
     begin
       inherited Create(AOWner);
    @@ -187,25 +176,20 @@
     end;
     
     function TPipeServerComm.PeekMessage(TimeOut: Integer): Boolean;
    -
     Var
       FDS : TFDSet;
    -
     begin
       fpfd_zero(FDS);
       fpfd_set(FStream.Handle,FDS);
    -  Result:=False;
    -  While fpSelect(FStream.Handle+1,@FDS,Nil,Nil,TimeOut)>0 do
    -    begin
    -    DoReadMessage;
    -    Result:=True;
    -    end;
    +  Result := fpSelect(FStream.Handle+1,@FDS,Nil,Nil,TimeOut)>0;
     end;
     
     procedure TPipeServerComm.ReadMessage;
    -
    +Var
    +  Hdr : TMsgHeader;
     begin
    -  DoReadMessage;
    +  FStream.ReadBuffer(Hdr,SizeOf(Hdr));
    +  PushMessage(Hdr,FStream);
     end;
     
     
    
  • ThreadedIPC-v2.zip (1,838 bytes)
  • ThreadedIPCGUI-v2.zip (128,723 bytes)

Relationships

related to 0033130 closedMichael Van Canneyt simpleipc example not work in thread mode 

Activities

Lars(L505)

2017-05-15 21:05

reporter   ~0100325

Last edited: 2017-05-15 21:10

View 3 revisions

Update: after seeing what is causing the problem, it appears to be the Terminated boolean. In Execute, the thread check boolean is not in a terminated state, even though StopServer calls terminated..

The code that leaves the program in a infinite loop:

procedure TServerThread.Execute;
begin
  While Not Terminated do
    FServer.PeekMessage(ThreadTimeout,False);
... // infinite loop above

StopServer calls terminate from StopThread:

procedure TSimpleIPCServer.StopThread;

Calls the following:
    FThread.Terminate and FThread.WaitFor

Not sure why the "Terminated" boolean is not being set to true: possibly a threading mind game I don't yet understand ;-)

Thaddy de Koning

2017-05-16 09:08

reporter   ~0100329

Terminated is only true if waitfor succeeds.

Michael Van Canneyt

2017-05-16 09:17

administrator   ~0100331

@Thaddy, Terminated of the thread is a simple boolean variable.
It has nothing to do with the terminated of the TSimpleIPCServer

Michael Van Canneyt

2017-05-16 14:07

administrator  

bugdemo.zip (129,127 bytes)

Michael Van Canneyt

2017-05-16 15:26

administrator   ~0100335

As far as I can see, the GetMessage() call on line 246 of simpleipc.inc never returns. It also does not return due to timer messages. (they do arrive in the windowproc)

I believe this is so because the FHWND does not belong to the thread that executes the loop. Of course then the loop in the thread.execute does finish either.

There are 2 options
- Use a different ProcessMessagesWait mechanism
- Create the window handle inside the thread.
Currently the TWinMsgServerComm object has no idea whether it is used in threaded or non-threaded mode.
This should maybe be adapted.

Lars(L505)

2017-05-18 00:57

reporter   ~0100379

Last edited: 2017-05-18 00:58

View 2 revisions

In regards to Thaddy's comment, doesn't waitfor force to wait for thread to finish? From what I can see waitfor is not successful, and does not return.

If I modify the source code and set "FreeOnTerminate" boolean true, and don't waitfor, it does not hang any more, but creates a memory leak.

Is this bug brand new though, where this actually worked before in a previous FPC version or lazarus version, but is now broken because of some different code? (regression testing)

Lars(L505)

2017-05-18 01:38

reporter   ~0100381

Last edited: 2017-05-18 01:41

View 3 revisions

MSDN documentation says

"A handle to the window whose messages are to be retrieved. The window must belong to the current thread."

So indeed the handle must belong to the current thread.

GetMessage return value:
"If there is an error, the return value is -1. For example, the function fails if hWnd is an invalid window handle or lpMsg is an invalid pointer. To get extended error information, call GetLastError."

"The possibility of a -1 return value in the case that hWnd is an invalid parameter"..."means that such code can lead to fatal application errors."

Michael Van Canneyt

2017-05-18 07:40

administrator   ~0100384

The problem is not in WaitFor. Waitfor simply waits for the thread to finish, but the thread does not finish, because it is stuck on the GetMessage call.

In my tests, the thread gets stuck on the first GetMessage call, when in fact it should not, since it should get at least the WM_TIMER messages. The only cause I can think of is the fact that the second thread does not own the window handle.
This is also the only thing that differs from the non-threaded case.

Denis Kozlov

2017-05-19 18:26

reporter   ~0100498

Michael, I confirm that a window and its message pipe have to be operated by the same thread that created it, for it to work correctly.

I see only one way to fix it. That is to have the new thread create and operate the window and its message pipe, which would require quite a bit of synchronization and message passing between two threads.

Alternatively, it may be possible to use Named Pipes instead of window messages to achieve IPC on Windows. I often wondered why Named Pipes never got used in FPC.

Denis Kozlov

2017-05-19 22:23

reporter   ~0100513

Incidentally, the current implementation of threaded IPC is unsafe, because the underlying platform-specific implementations are not guaranteed to be thread-safe.

Instead of the thread being managed internally by the IPC server itself, exposing IPC calls to at least two threads, perhaps IPC thread should be the outer wrapper that reimplements all public methods of IPC server with a build-in thread-safe access, shielding the internal IPC mechanism.

This also gets around the window messages problem, because the threaded IPC wrapper will be the one starting the IPC server inside its own thread context.

Michael Van Canneyt

2017-05-20 13:36

administrator   ~0100540

Threading should be an add-on, not the default. So I don't want to let the thread be responsible for the exposed 'api'.

I am aware that the mechanism is not thread-safe; it should be easily fixable by using a thread list for TIPCServerMsgQueue

What concerns named pipe: this is why there is a 'driver' approach.
I had in mind one day to implement a named pipe approach, but I never found time.

Denis Kozlov

2017-05-20 21:24

reporter   ~0100547

> Threading should be an add-on, not the default. So I don't want to let the thread be responsible for the exposed 'api'.

I didn't explain it well. What I proposed is a separate class which creates an instance of TSimpleIPCServer internally, manages it and runs the peek message loop, dispatching received messaged via a callback function. This will ensure that no other thread will have access to the 'driver'. This also solves a problem where a driver must be managed by only one thread, i.e. requirement of the current Windows implementation.

> I am aware that the mechanism is not thread-safe; it should be easily fixable by using a thread list for TIPCServerMsgQueue.

It won't make it thread safe, because public methods/properties of TSimpleIPCServer may still expose unsafe access to the driver.

In regards to the named pipes. I put together TNamedPipeStream, TNamedPipeServerStream and TNamedPipeClientStream classes which make the use of named pipes very easy, but it is for windows only. If it is of interest, I can share the code.

Michael Van Canneyt

2017-06-03 07:18

administrator   ~0100813

@Denis, can you show me what you mean for the threaded case ?

As for the named pipe:
if you can share the code, I can probably work it into the code so it is selectable: window handle/named pipe. That was the idea of using a driver class.

Marco van de Voort

2017-06-04 17:21

manager   ~0100858

(named pipe were very buggy on Win9x, which is why they were not used much before Windows XP became (very) dominant)

Denis Kozlov

2017-06-08 02:45

reporter   ~0100943

I will produce an example for the threaded case and will share the named pipes classes.

P.S. It might take me a week or two to get to it.

Thaddy de Koning

2017-06-08 11:24

reporter   ~0100945

Last edited: 2017-06-08 11:27

View 2 revisions

I tried to solve the issue with a (counting) semaphore. That seems to work, but I guess Denis solution is better.
My idea being that it takes a lot less synchronization as suggested.
I do not have a "clean" solution yet, but only ad-hoc code.

Denis Kozlov

2017-06-10 21:10

reporter   ~0101009

Threaded TSimpleIPCServer implementation:
https://github.com/dezlov/SimpleIPCServerThread/

This is just a proof of concept, so there is plenty room for improvement. For example, if both TSimpleIPCServer and TSimpleIPCServerThread could adhere to a common interface, then they could be used interchangeably in user land.

P.S. Named pipe classes will follow, a bit later.

Michael Van Canneyt

2017-06-10 22:05

administrator   ~0101010

In view of your remark, I don't understand why you coded it with a different interface to begin with ? :)

Then, maybe an idea to keep a single interface: at the current moment, the driver is unaware that it is working in a thread or not. We simply start a thread that does the polling

Maybe we can back off from this idea and make a "Threaded Driver" and a "normal driver"; the TSimpleServer can then create one or the other based on the 'threaded'
parameter. Your current code can be used to create the threaded driver, as it contains the synchronisation code.

Denis Kozlov

2017-06-13 14:41

reporter   ~0101092

This was just a demonstration of the concept. I wanted to 100% shield the underlying driver, which helped identify the changes necessary to make the TSimpleIPCServer threaded and thread-safe.

Now, I think I can merge this threaded concept back into TSimpleIPCServer. This should not affect the drivers.

I'll give it a try, if this concept is suitable.

Michael Van Canneyt

2017-06-13 15:58

administrator   ~0101095

I re-checked the code; I'd say, OK, go ahead...
Make sure you don't break the non-threaded case ;)

Denis Kozlov

2017-06-20 18:17

reporter   ~0101232

Just quick update...

I'm still working on it and it is looking good so far.

By the way, the Named Pipes code is here:
https://github.com/dezlov/NamedPipesFPC

Denis Kozlov

2017-07-10 00:44

reporter   ~0101645

It is done.

The attached patch reimplements the threaded mode of operation of TSimpleIPCServer to improve thread-safety and make operation of IPC driver fully encapsulated within IPC server thread context, plus few minor improvements.

No BC breaking changes were introduced. Example CLI and GUI projects attached, they include a full copy of simpleipc unit so they can be tested more easily. Tested on Windows platform.

Summary of changes:

* IPC server (in threaded mode) creates and destroys IPC driver within the context of its own dedicated thread. This addresses a limitation of some IPC drivers which have to be initialized and operated by the same thread only, e.g. Windows message queue based driver.
* Methods of IPC server which operate directly on the IPC driver can be executed only from within the IPC server thread context.
* Capture IPC thread error via OnThreadError event and access error message via ThreadError property.
* StartServer raises and exception if failed to start an IPC thread.
* Forbid modification of threaded variables/properties when IPC is active.
* Use a native instance of TIPCServerMsg to store the current message in the IPC server via the Message property, instead of duplicating the message content.
* TIPCServerMsg can take in foreign data streams with optional freeing.
* TIPCServerMsg can convert the data stream into a string.
* Fixed a small bug in TSimpleIPCServer.PushMessage: DoMessageQueued called instead of DoMessageError in case of exception in a non-threaded mode.
* Optional synchronization of server event callbacks with the main thread when in threaded mode. Enabled by default, but can be disabled for an increased performance if user wishes to take care of synchronization within the callback routines.
* Use TSimpleIPCServer.Threaded published property to toggle threaded mode of operation. This permits complete configuration at design time. StartServer(AThreaded) method remains for backwards compatibility, but may be deprecated.
* Make IPC client also use application name as the ServerID when ServerID property is empty, just like the IPC server.
* Explicit defaults for published properties of TSimpleIPCServer.

Denis Kozlov

2017-07-10 00:45

reporter  

simpleipc.pp.patch (28,214 bytes)
Index: packages/fcl-process/src/simpleipc.pp
===================================================================
--- packages/fcl-process/src/simpleipc.pp	(revision 36715)
+++ packages/fcl-process/src/simpleipc.pp	(working copy)
@@ -20,13 +20,12 @@
 interface
 
 uses
-  Contnrs, Classes, SysUtils;
+  Contnrs, SyncObjs, Classes, SysUtils;
 
-Const
+const
   MsgVersion = 1;
-  DefaultThreadTimeOut = 50;
 
-  //Message types
+  { IPC message types }
   mtUnknown = 0;
   mtString = 1;
 
@@ -33,13 +32,8 @@
 type
   TIPCMessageOverflowAction = (ipcmoaNone, ipcmoaDiscardOld, ipcmoaDiscardNew, ipcmoaError);
 
-var
-  DefaultIPCMessageOverflowAction: TIPCMessageOverflowAction = ipcmoaNone;
-  DefaultIPCMessageQueueLimit: Integer = 0;
+  TMessageType = LongInt;
 
-Type
-
-  TMessageType = LongInt;
   TMsgHeader = Packed record
     Version : Byte;
     MsgType : TMessageType;
@@ -49,17 +43,29 @@
   TSimpleIPCServer = class;
   TSimpleIPCClient = class;
 
+  { TIPCServerMsg }
   TIPCServerMsg = class
+  private type
+    TStreamClass = class of TStream;
+  private const
+    // TMemoryStream uses an effecient grow algorithm.
+    DefaultStreamClass: TStreamClass = TMemoryStream;
   strict private
     FStream: TStream;
+    FOwnsStream: Boolean;
     FMsgType: TMessageType;
+    function GetStringMessage: String;
   public
     constructor Create;
+    constructor Create(AStream: TStream; AOwnsStream: Boolean = True);
     destructor Destroy; override;
     property Stream: TStream read FStream;
     property MsgType: TMessageType read FMsgType write FMsgType;
+    property OwnsStream: Boolean read FOwnsStream write FOwnsStream;
+    property StringMessage: String read GetStringMessage;
   end;
 
+  { TIPCServerMsgQueue }
   TIPCServerMsgQueue = class
   strict private
     FList: TFPObjectList;
@@ -80,7 +86,6 @@
   end;
 
   { TIPCServerComm }
-  
   TIPCServerComm = Class(TObject)
   Private
     FOwner  : TSimpleIPCServer;
@@ -111,6 +116,7 @@
     FBusy: Boolean;
     FActive : Boolean;
     FServerID : String;
+    procedure PrepareServerID;
     Procedure DoError(const Msg: String; const Args: array of const);
     Procedure CheckInactive;
     Procedure CheckActive;
@@ -123,26 +129,42 @@
     Property ServerID : String Read FServerID Write SetServerID;
   end;
 
+  TMessageQueueEvent = Procedure(Sender: TObject; Msg: TIPCServerMsg) of object;
+
   { TSimpleIPCServer }
-
-  TMessageQueueEvent = Procedure(Sender : TObject; Msg : TIPCServerMsg) of object;
-
   TSimpleIPCServer = Class(TSimpleIPC)
-  protected
-  Private
+  private const
+    DefaultThreaded = False;
+    DefaultThreadTimeout = 50;
+    DefaultSynchronizeEvents = True;
+    DefaultMaxAction = ipcmoaNone;
+    DefaultMaxQueue = 0;
+  private
     FOnMessageError: TMessageQueueEvent;
     FOnMessageQueued: TNotifyEvent;
-    FQueue : TIPCServerMsgQueue;
+    FOnMessage: TNotifyEvent;
+    FOnThreadError: TNotifyEvent;
+    FQueue: TIPCServerMsgQueue;
     FGlobal: Boolean;
-    FOnMessage: TNotifyEvent;
-    FMsgType: TMessageType;
-    FMsgData : TStream;
-    FThreadTimeOut: Integer;
-    FThread : TThread;
-    FLock : TRTLCriticalSection;
-    FErrMsg : TIPCServerMsg;
-    procedure DoMessageQueued;
-    procedure DoMessageError;
+    // Access to the message is not locked by design, because in the threaded
+    // mode it should be accessed only during synchronous event callbacks.
+    FMessage: TIPCServerMsg;
+    FTempMessage: TIPCServerMsg;
+    FThreaded: Boolean;
+    FThreadTimeout: Integer;
+    FThreadError: String;
+    FThreadExecuting: Boolean;
+    FThreadReadyEvent: TSimpleEvent;
+    FThread: TThread;
+    FSynchronizeEvents: Boolean;
+    procedure DoOnMessage;
+    procedure DoOnMessageQueued;
+    procedure DoOnMessageError(Msg: TIPCServerMsg);
+    procedure DoOnThreadError;
+    procedure InternalDoOnMessage;
+    procedure InternalDoOnMessageQueued;
+    procedure InternalDoOnMessageError;
+    procedure InternalDoOnThreadError;
     function GetInstanceID: String;
     function GetMaxAction: TIPCMessageOverflowAction;
     function GetMaxQueue: Integer;
@@ -150,13 +172,23 @@
     procedure SetGlobal(const AValue: Boolean);
     procedure SetMaxAction(AValue: TIPCMessageOverflowAction);
     procedure SetMaxQueue(AValue: Integer);
-  Protected
+    procedure SetThreaded(AValue: Boolean);
+    procedure SetThreadTimeout(AValue: Integer);
+    procedure SetSynchronizeEvents(AValue: Boolean);
+    procedure CheckThreadContext;
+    function WaitForReady(Timeout: Cardinal): Boolean;
+    function WaitForReady: Boolean;
+    function GetMsgType: TMessageType;
+    function GetMsgData: TStream;
+  protected
     FIPCComm: TIPCServerComm;
-    procedure StartThread; virtual;
-    procedure StopThread; virtual;
     Function CommClass : TIPCServerCommClass; virtual;
     Procedure PushMessage(Msg : TIPCServerMsg); virtual;
     function PopMessage: Boolean; virtual;
+    procedure StartComm; virtual;
+    procedure StopComm; virtual;
+    function StartThread: Boolean; virtual;
+    procedure StopThread; virtual;
     Procedure Activate; override;
     Procedure Deactivate; override;
     Property Queue : TIPCServerMsgQueue Read FQueue;
@@ -164,17 +196,20 @@
   Public
     Constructor Create(AOwner : TComponent); override;
     Destructor Destroy; override;
-    Procedure StartServer(Threaded : Boolean = False);
+    Procedure StartServer;
+    Procedure StartServer(AThreaded: Boolean);
     Procedure StopServer;
     Function PeekMessage(TimeOut : Integer; DoReadMessage : Boolean): Boolean;
     Procedure ReadMessage;
     Property  StringMessage : String Read GetStringMessage;
     Procedure GetMessageData(Stream : TStream);
-    Property  MsgType: TMessageType Read FMsgType;
-    Property  MsgData : TStream Read FMsgData;
+    Property  Message: TIPCServerMsg read FMessage;
+    Property  MsgType: TMessageType Read GetMsgType;
+    Property  MsgData: TStream Read GetMsgData;
     Property  InstanceID : String Read GetInstanceID;
+    property  ThreadExecuting: Boolean read FThreadExecuting;
+    property  ThreadError: String read FThreadError;
   Published
-    Property ThreadTimeOut : Integer Read FThreadTimeOut Write FThreadTimeOut;
     Property Global : Boolean Read FGlobal Write SetGlobal;
     // Called during ReadMessage
     Property OnMessage : TNotifyEvent Read FOnMessage Write FOnMessage;
@@ -182,14 +217,21 @@
     Property OnMessageQueued : TNotifyEvent Read FOnMessageQueued Write FOnMessageQueued;
     // Called when the queue overflows and  MaxAction = ipcmoaError.
     Property OnMessageError : TMessageQueueEvent Read FOnMessageError Write FOnMessageError;
+    // Called when the server thread catches an exception.
+    property OnThreadError: TNotifyEvent read FOnThreadError write FOnThreadError;
     // Maximum number of messages to keep in the queue
-    property MaxQueue: Integer read GetMaxQueue write SetMaxQueue;
+    property MaxQueue: Integer read GetMaxQueue write SetMaxQueue default DefaultMaxQueue;
     // What to do when the queue overflows
-    property MaxAction: TIPCMessageOverflowAction read GetMaxAction write SetMaxAction;
+    property MaxAction: TIPCMessageOverflowAction read GetMaxAction write SetMaxAction default DefaultMaxAction;
+    // Instruct IPC server to operate in a threaded mode.
+    property Threaded: Boolean read FThreaded write SetThreaded;
+    // Amount of time thread waits for a message before checking for termination.
+    property ThreadTimeout: Integer read FThreadTimeout write SetThreadTimeout default DefaultThreadTimeout;
+    // Synchronize events with the main thread when in threaded mode.
+    property SynchronizeEvents: Boolean read FSynchronizeEvents write SetSynchronizeEvents default DefaultSynchronizeEvents;
   end;
 
-
-  { TIPCClientComm}
+  { TIPCClientComm }
   TIPCClientComm = Class(TObject)
   private
     FOwner: TSimpleIPCClient;
@@ -229,17 +271,23 @@
     Property  ServerInstance : String Read FServerInstance Write SetServerInstance;
   end;
 
-
   EIPCError = Class(Exception);
 
-Var
+var
   DefaultIPCServerClass : TIPCServerCommClass = Nil;
   DefaultIPCClientClass : TIPCClientCommClass = Nil;
 
+var
+  DefaultIPCMessageOverflowAction: TIPCMessageOverflowAction = TSimpleIPCServer.DefaultMaxAction;
+  DefaultIPCMessageQueueLimit: Integer = TSimpleIPCServer.DefaultMaxQueue;
+
 resourcestring
   SErrServerNotActive = 'Server with ID %s is not active.';
   SErrActive = 'This operation is illegal when the server is active.';
   SErrInActive = 'This operation is illegal when the server is inactive.';
+  SErrThreadContext = 'This operation is illegal outside of IPC thread context.';
+  SErrThreadFailure = 'IPC thread failure.';
+  SErrMessageQueueOverflow = 'Message queue overflow (limit %s)';
 
 
 implementation
@@ -252,35 +300,82 @@
   This comes first, to allow the uses clause to be set.
   If the include file defines OSNEEDIPCINITDONE then the unit will
   call IPCInit and IPCDone in the initialization/finalization code.
-  
   --------------------------------------------------------------------- }
 {$UNDEF OSNEEDIPCINITDONE}
 
 {$i simpleipc.inc}
 
-Resourcestring
-  SErrMessageQueueOverflow = 'Message queue overflow (limit %s)';
+// Convert content of any stream type to a string.
+function FastStreamToString(Stream: TStream): String;
+var
+  CharCount, CharSize: Integer;
+  StringStream: TStringStream;
+  OldPosition: Int64;
+begin
+  // Optimized for TStringStream
+  if Stream is TStringStream then
+  begin
+    Result := TStringStream(Stream).DataString;
+  end
+  // Optimized for TCustomMemoryStream
+  else if Stream is TCustomMemoryStream then
+  begin
+    Result := '';
+    CharSize := StringElementSize(Result);
+    CharCount := Stream.Size div CharSize;
+    SetLength(Result, CharCount);
+    Move(TCustomMemoryStream(Stream).Memory^, Result[1], CharCount * CharSize);
+  end
+  // Any other stream type
+  else
+  begin
+    OldPosition := Stream.Position;
+    try
+      StringStream := TStringStream.Create('');
+      try
+        Stream.Position := 0;
+        StringStream.CopyFrom(Stream, Stream.Size);
+        Result := StringStream.DataString;
+      finally
+        StringStream.Free;
+      end;
+    finally
+      Stream.Position := OldPosition;
+    end;
+  end;
+end;
 
-{ ---------------------------------------------------------------------
-    TIPCServerMsg
-  ---------------------------------------------------------------------}
+{$REGION 'TIPCServerMsg'}
 
-
 constructor TIPCServerMsg.Create;
 begin
-  FMsgType := 0;
-  FStream := TMemoryStream.Create;
+  FMsgType := mtUnknown;
+  FStream := Self.DefaultStreamClass.Create;
+  FOwnsStream := True;
 end;
 
+constructor TIPCServerMsg.Create(AStream: TStream; AOwnsStream: Boolean);
+begin
+  FMsgType := mtUnknown;
+  FStream := AStream;
+  FOwnsStream := AOwnsStream;
+end;
+
 destructor TIPCServerMsg.Destroy;
 begin
-  FStream.Free;
+  if FOwnsStream then
+    FreeAndNil(FStream);
 end;
 
-{ ---------------------------------------------------------------------
-    TIPCServerMsgQueue
-  ---------------------------------------------------------------------}
+function TIPCServerMsg.GetStringMessage: String;
+begin
+  Result := FastStreamToString(FStream);
+end;
 
+{$ENDREGION}
+
+{$REGION 'TIPCServerMsgQueue'}
+
 constructor TIPCServerMsgQueue.Create;
 begin
   FMaxCount := DefaultIPCMessageQueueLimit;
@@ -336,6 +431,7 @@
 
 procedure TIPCServerMsgQueue.Push(AItem: TIPCServerMsg);
 begin
+  // PrepareToPush may throw an exception, e.g. if message queue is full.
   if PrepareToPush then
     FList.Insert(0, AItem);
 end;
@@ -355,10 +451,9 @@
     Result := nil;
 end;
 
+{$ENDREGION}
 
-{ ---------------------------------------------------------------------
-    TIPCServerComm
-  ---------------------------------------------------------------------}
+{$REGION 'TIPCServerComm'}
 
 constructor TIPCServerComm.Create(AOwner: TSimpleIPCServer);
 begin
@@ -366,16 +461,13 @@
 end;
 
 procedure TIPCServerComm.DoError(const Msg: String; const Args: array of const);
-
 begin
   FOwner.DoError(Msg,Args);
 end;
 
 procedure TIPCServerComm.PushMessage(const Hdr: TMsgHeader; AStream: TStream);
-
-Var
+var
   M : TIPCServerMsg;
-
 begin
   M:=TIPCServerMsg.Create;
   try
@@ -394,9 +486,9 @@
   FOwner.PushMessage(Msg);
 end;
 
-{ ---------------------------------------------------------------------
-    TIPCClientComm
-  ---------------------------------------------------------------------}
+{$ENDREGION}
+
+{$REGION 'TIPCClientComm'}
   
 constructor TIPCClientComm.Create(AOwner: TSimpleIPCClient);
 begin
@@ -409,10 +501,10 @@
   FOwner.DoError(Msg,Args);
 end;  
 
-{ ---------------------------------------------------------------------
-    TSimpleIPC
-  ---------------------------------------------------------------------}
+{$ENDREGION}
 
+{$REGION 'TSimpleIPC'}
+
 Procedure TSimpleIPC.DoError(const Msg: String; const Args: array of const);
 var
   FullMsg: String;
@@ -441,7 +533,7 @@
 procedure TSimpleIPC.SetActive(const AValue: Boolean);
 begin
   if (FActive<>AValue) then
-    begin
+  begin
     if ([]<>([csLoading,csDesigning]*ComponentState)) then
       FActive:=AValue
     else  
@@ -449,37 +541,90 @@
         Activate
       else
         Deactivate;
-    end;
+  end;
 end;
 
 procedure TSimpleIPC.SetServerID(const AValue: String);
 begin
   if (FServerID<>AValue) then
-    begin
+  begin
     CheckInactive;
-    FServerID:=AValue
-    end;
+    FServerID:=AValue;
+  end;
 end;
 
-Procedure TSimpleIPC.Loaded; 
+procedure TSimpleIPC.PrepareServerID;
+begin
+  if FServerID = '' then
+    FServerID := ApplicationName;
+  // Extra precaution for thread-safety
+  UniqueString(FServerID);
+end;
 
-Var
+procedure TSimpleIPC.Loaded;
+var
   B : Boolean;
-
 begin
-  Inherited;
+  inherited;
   B:=FActive;
   if B then
-    begin
-    Factive:=False;
+  begin
+    FActive:=False;
     Activate;
+  end;
+end;
+
+{$ENDREGION}
+
+{$REGION 'TIPCServerThread'}
+
+type
+  TIPCServerThread = class(TThread)
+  private
+    FServer: TSimpleIPCServer;
+  protected
+    procedure Execute; override;
+  public
+    constructor Create(AServer: TSimpleIPCServer);
+    property Server: TSimpleIPCServer read FServer;
+  end;
+
+constructor TIPCServerThread.Create(AServer: TSimpleIPCServer);
+begin
+  inherited Create(True); // CreateSuspended = True
+  FServer := AServer;
+end;
+
+procedure TIPCServerThread.Execute;
+begin
+  FServer.FThreadExecuting := True;
+  try
+    FServer.StartComm;
+    try
+      // Notify server that thread has started.
+      FServer.FThreadReadyEvent.SetEvent;
+      // Run message loop
+      while not Terminated do
+        FServer.PeekMessage(FServer.ThreadTimeout, True);
+    finally
+      FServer.StopComm;
     end;
+  except on E: Exception do
+    begin
+      FServer.FThreadExecuting := False;
+      FServer.FThreadError := E.Message;
+      // Trigger event to wake up the caller from potentially indefinite wait.
+      FServer.FThreadReadyEvent.SetEvent;
+      FServer.DoOnThreadError;
+    end;
+  end;
+  FServer.FThreadExecuting := False;
 end;
 
-{ ---------------------------------------------------------------------
-    TSimpleIPCServer
-  ---------------------------------------------------------------------}
+{$ENDREGION}
 
+{$REGION 'TSimpleIPCServer'}
+
 constructor TSimpleIPCServer.Create(AOwner: TComponent);
 begin
   inherited Create(AOwner);
@@ -486,9 +631,11 @@
   FGlobal:=False;
   FActive:=False;
   FBusy:=False;
-  FMsgData:=TStringStream.Create('');
+  FMessage:=nil;
   FQueue:=TIPCServerMsgQueue.Create;
-  FThreadTimeOut:=DefaultThreadTimeOut;
+  FThreaded:=DefaultThreaded;
+  FThreadTimeout:=DefaultThreadTimeout;
+  FSynchronizeEvents:=DefaultSynchronizeEvents;
 end;
 
 destructor TSimpleIPCServer.Destroy;
@@ -495,26 +642,44 @@
 begin
   Active:=False;
   FreeAndNil(FQueue);
-  FreeAndNil(FMsgData);
+  if Assigned(FMessage) then
+    FreeAndNil(FMessage);
   inherited Destroy;
 end;
 
 procedure TSimpleIPCServer.SetGlobal(const AValue: Boolean);
 begin
-  if (FGlobal<>AValue) then
-    begin
-    CheckInactive;
-    FGlobal:=AValue;
-    end;
+  CheckInactive;
+  FGlobal:=AValue;
 end;
 
+procedure TSimpleIPCServer.SetThreaded(AValue: Boolean);
+begin
+  CheckInactive;
+  FThreaded:=AValue;
+end;
+
+procedure TSimpleIPCServer.SetThreadTimeout(AValue: Integer);
+begin
+  CheckInactive;
+  FThreadTimeout:=AValue;
+end;
+
+procedure TSimpleIPCServer.SetSynchronizeEvents(AValue: Boolean);
+begin
+  CheckInactive;
+  FSynchronizeEvents:=AValue;
+end;
+
 procedure TSimpleIPCServer.SetMaxAction(AValue: TIPCMessageOverflowAction);
 begin
+  CheckInactive;
   FQueue.MaxAction:=AValue;
 end;
 
 procedure TSimpleIPCServer.SetMaxQueue(AValue: Integer);
 begin
+  CheckInactive;
   FQueue.MaxCount:=AValue;
 end;
 
@@ -533,86 +698,98 @@
   Result:=FQueue.MaxCount;
 end;
 
+procedure TSimpleIPCServer.CheckThreadContext;
+begin
+  // Check that the caller is in the IPC thread context.
+  if Assigned(FThread) then
+  begin
+    if System.GetThreadID <> FThread.ThreadID then
+      raise EIPCError.Create(SErrThreadContext);
+  end;
+end;
 
-function TSimpleIPCServer.GetStringMessage: String;
+procedure TSimpleIPCServer.StartComm;
 begin
-  Result:=TStringStream(FMsgData).DataString;
+  if Assigned(FIPCComm) then
+    FreeAndNil(FIPCComm);
+  FIPCComm := CommClass.Create(Self);
+  FIPCComm.StartServer;
 end;
 
+procedure TSimpleIPCServer.StopComm;
+begin
+  if Assigned(FIPCComm) then
+  begin
+    FIPCComm.StopServer;
+    FreeAndNil(FIPCComm);
+  end;
+end;
 
-procedure TSimpleIPCServer.StartServer(Threaded : Boolean = False);
+function TSimpleIPCServer.StartThread: Boolean;
 begin
-  if Not Assigned(FIPCComm) then
-    begin
-    If (FServerID='') then
-      FServerID:=ApplicationName;
-    FIPCComm:=CommClass.Create(Self);
-    FIPCComm.StartServer;
-    end;
-  FActive:=True;
-  If Threaded then
-    StartThread;
+  FThreadError := '';
+  FThreadReadyEvent := SyncObjs.TSimpleEvent.Create;
+  FThread := TIPCServerThread.Create(Self);
+  FThread.Start;
+  Result := WaitForReady;
 end;
 
-Type
-
-  { TServerThread }
-
-  TServerThread = Class(TThread)
-  private
-    FServer: TSimpleIPCServer;
-    FThreadTimeout: Integer;
-  Public
-    Constructor Create(AServer : TSimpleIPCServer; ATimeout : integer);
-    procedure Execute; override;
-    Property Server : TSimpleIPCServer Read FServer;
-    Property ThreadTimeout : Integer Read FThreadTimeout;
+procedure TSimpleIPCServer.StopThread;
+begin
+  if Assigned(FThread) then
+  begin
+    FThread.Terminate;
+    FThread.WaitFor;
+    FreeAndNil(FThread);
   end;
+  if Assigned(FThreadReadyEvent) then
+    FreeAndNil(FThreadReadyEvent);
+end;
 
-{ TServerThread }
-
-constructor TServerThread.Create(AServer: TSimpleIPCServer; ATimeout: integer);
+function TSimpleIPCServer.WaitForReady(Timeout: Cardinal): Boolean;
 begin
-  FServer:=AServer;
-  FThreadTimeout:=ATimeOut;
-  Inherited Create(False);
+  if FThreadReadyEvent.WaitFor(Timeout) = wrSignaled then
+    Result := FThreadExecuting
+  else
+    Result := False;
 end;
 
-procedure TServerThread.Execute;
+function TSimpleIPCServer.WaitForReady: Boolean;
 begin
-  While Not Terminated do
-    FServer.PeekMessage(ThreadTimeout,False);
+  Result := WaitForReady(SyncObjs.INFINITE);
 end;
 
-procedure TSimpleIPCServer.StartThread;
-
+procedure TSimpleIPCServer.StartServer;
 begin
-  InitCriticalSection(FLock);
-  FThread:=TServerThread.Create(Self,ThreadTimeOut);
+  StartServer(FThreaded);
 end;
 
-procedure TSimpleIPCServer.StopThread;
-
+procedure TSimpleIPCServer.StartServer(AThreaded: Boolean);
 begin
-  if Assigned(FThread) then
+  CheckInactive;
+  FActive := True;
+  try
+    PrepareServerID;
+    FThreaded := AThreaded;
+    if FThreaded then
     begin
-    FThread.Terminate;
-    FThread.WaitFor;
-    FreeAndNil(FThread);
-    DoneCriticalSection(FLock);
-    end;
+      if not StartThread then
+        raise EIPCError.Create(SErrThreadFailure);
+    end
+    else
+      StartComm;
+  except
+    FActive := False;
+    raise;
+  end;
 end;
 
 procedure TSimpleIPCServer.StopServer;
 begin
   StopThread;
-  If Assigned(FIPCComm) then
-    begin
-    FIPCComm.StopServer;
-    FreeAndNil(FIPCComm);
-    end;
+  StopComm;
   FQueue.Clear;
-  FActive:=False;
+  FActive := False;
 end;
 
 // TimeOut values:
@@ -622,10 +799,11 @@
 //   < -1  -- wait infinitely (force to -1)
 function TSimpleIPCServer.PeekMessage(TimeOut: Integer; DoReadMessage: Boolean): Boolean;
 begin
+  CheckThreadContext;
   CheckActive;
   Result:=Queue.Count>0;
   If Not Result then
-    begin
+  begin
     if TimeOut < -1 then
       TimeOut := -1;
     FBusy:=True;
@@ -634,7 +812,7 @@
     Finally
       FBusy:=False;
     end;
-    end;
+  end;
   If Result then
     If DoReadMessage then
       Readmessage;
@@ -641,55 +819,83 @@
 end;
 
 function TSimpleIPCServer.PopMessage: Boolean;
-
-var
-  MsgItem: TIPCServerMsg;
-  DoLock : Boolean;
-
 begin
-  DoLock:=Assigned(FThread);
-  if DoLock then
-    EnterCriticalsection(Flock);
-  try
-    MsgItem:=FQueue.Pop;
-  finally
-    if DoLock then
-      LeaveCriticalsection(FLock);
-  end;
-  Result:=Assigned(MsgItem);
-  if Result then
-    try
-      FMsgType := MsgItem.MsgType;
-      MsgItem.Stream.Position := 0;
-      FMsgData.Size := 0;
-      FMsgData.CopyFrom(MsgItem.Stream, MsgItem.Stream.Size);
-    finally
-      MsgItem.Free;
-    end;
+  CheckThreadContext;
+  // Access to the message is not locked by design!
+  if Assigned(FMessage) then
+    FreeAndNil(FMessage);
+  FMessage := FQueue.Pop;
+  Result := Assigned(FMessage);
 end;
 
 procedure TSimpleIPCServer.ReadMessage;
-
 begin
+  CheckThreadContext;
   CheckActive;
   FBusy:=True;
   Try
     if (FQueue.Count=0) then
-      // Readmessage pushes a message to the queue
+      // ReadMessage pushes a message to the queue
       FIPCComm.ReadMessage;
     if PopMessage then
-      If Assigned(FOnMessage) then
-        FOnMessage(Self);
+      DoOnMessage;
   Finally
     FBusy:=False;
   end;
 end;
 
+procedure TSimpleIPCServer.PushMessage(Msg: TIPCServerMsg);
+var
+  Error: Boolean;
+begin
+  Error := False;
+  try
+    // Queue.Push may throw an exception, e.g. if message queue is full.
+    Queue.Push(Msg);
+  except
+    Error := True;
+  end;
+  if Error then
+    // Handler must free the Msg, because it is not owned by anybody.
+    DoOnMessageError(Msg)
+  else
+    DoOnMessageQueued;
+end;
+
+function TSimpleIPCServer.GetMsgType: TMessageType;
+begin
+  // Access to the message is not locked by design!
+  if Assigned(FMessage) then
+    Result := FMessage.MsgType
+  else
+    Result := mtUnknown;
+end;
+
+function TSimpleIPCServer.GetMsgData: TStream;
+begin
+  // Access to the message is not locked by design!
+  if Assigned(FMessage) then
+    Result := FMessage.Stream
+  else
+    Result := nil;
+end;
+
 procedure TSimpleIPCServer.GetMessageData(Stream: TStream);
 begin
-  Stream.CopyFrom(FMsgData,0);
+  // Access to the message is not locked by design!
+  if Assigned(FMessage) then
+    Stream.CopyFrom(FMessage.Stream, 0);
 end;
 
+function TSimpleIPCServer.GetStringMessage: String;
+begin
+  // Access to the message is not locked by design!
+  if Assigned(FMessage) then
+    Result := FMessage.StringMessage
+  else
+    Result := '';
+end;
+
 procedure TSimpleIPCServer.Activate;
 begin
   StartServer;
@@ -700,62 +906,87 @@
   StopServer;
 end;
 
+procedure TSimpleIPCServer.DoOnMessage;
+begin
+  if Assigned(FOnMessage) then
+  begin
+    if FSynchronizeEvents and Assigned(FThread) then
+      TThread.Synchronize(FThread, @InternalDoOnMessage)
+    else
+      InternalDoOnMessage;
+  end;
+end;
 
-procedure TSimpleIPCServer.DoMessageQueued;
+procedure TSimpleIPCServer.InternalDoOnMessage;
+begin
+  if Assigned(FOnMessage) then
+    FOnMessage(Self);
+end;
 
+procedure TSimpleIPCServer.DoOnMessageQueued;
 begin
   if Assigned(FOnMessageQueued) then
+  begin
+    if FSynchronizeEvents and Assigned(FThread) then
+      TThread.Synchronize(FThread, @InternalDoOnMessageQueued)
+    else
+      InternalDoOnMessageQueued;
+  end;
+end;
+
+procedure TSimpleIPCServer.InternalDoOnMessageQueued;
+begin
+  if Assigned(FOnMessageQueued) then
     FOnMessageQueued(Self);
 end;
 
-procedure TSimpleIPCServer.DoMessageError;
+procedure TSimpleIPCServer.DoOnMessageError(Msg: TIPCServerMsg);
 begin
   try
-    if Assigned(FOnMessageQueued) then
-      FOnMessageError(Self,FErrMsg);
+    if Assigned(FOnMessageError) then
+    begin
+      // Temp message (class instance variable) is used to pass
+      // a parameter to a synchronized thread method.
+      FTempMessage := Msg;
+      if FSynchronizeEvents and Assigned(FThread) then
+        TThread.Synchronize(FThread, @InternalDoOnMessageError)
+      else
+        InternalDoOnMessageError;
+    end;
   finally
-    FreeAndNil(FErrMsg)
+    // Must free the message because it is not owned by anybody.
+    FTempMessage := nil;
+    FreeAndNil(Msg);
   end;
 end;
 
-procedure TSimpleIPCServer.PushMessage(Msg: TIPCServerMsg);
+procedure TSimpleIPCServer.InternalDoOnMessageError;
+begin
+  if Assigned(FOnMessageError) then
+    FOnMessageError(Self, FTempMessage);
+end;
 
-Var
-  DoLock : Boolean;
-
+procedure TSimpleIPCServer.DoOnThreadError;
 begin
-  try
-    DoLock:=Assigned(FThread);
-    If DoLock then
-      EnterCriticalsection(FLock);
-    try
-      Queue.Push(Msg);
-    finally
-      If DoLock then
-        LeaveCriticalsection(FLock);
-    end;
-    if DoLock then
-      TThread.Synchronize(FThread,@DoMessageQueued)
+  if Assigned(FOnThreadError) then
+  begin
+    if FSynchronizeEvents and Assigned(FThread) then
+      TThread.Synchronize(FThread, @InternalDoOnThreadError)
     else
-      DoMessageQueued;
-  except
-    On E : Exception do
-      FErrMsg:=Msg;
+      InternalDoOnThreadError;
   end;
-  if Assigned(FErrMsg) then
-    if DoLock then
-      TThread.Synchronize(FThread,@DoMessageError)
-    else
-      DoMessageQueued;
+end;
 
+procedure TSimpleIPCServer.InternalDoOnThreadError;
+begin
+  if Assigned(FOnThreadError) then
+    FOnThreadError(Self);
 end;
 
+{$ENDREGION}
 
+{$REGION 'TSimpleIPCClient'}
 
-{ ---------------------------------------------------------------------
-    TSimpleIPCClient
-  ---------------------------------------------------------------------}
-
 procedure TSimpleIPCClient.SetServerInstance(const AValue: String);
 begin
   CheckInactive;
@@ -776,7 +1007,7 @@
   inherited Create(AOwner);
 end;
 
-destructor TSimpleIPCClient.destroy;
+destructor TSimpleIPCClient.Destroy;
 begin
   Active:=False;
   Inherited;
@@ -785,7 +1016,8 @@
 procedure TSimpleIPCClient.Connect;
 begin
   If Not assigned(FIPCComm) then
-    begin
+  begin
+    PrepareServerID;
     FIPCComm:=CommClass.Create(Self);
     Try
       FIPCComm.Connect;
@@ -794,7 +1026,7 @@
       Raise;
     end;  
     FActive:=True;
-    end;
+  end;
 end;
 
 procedure TSimpleIPCClient.Disconnect;
@@ -809,21 +1041,24 @@
 end;
 
 function TSimpleIPCClient.ServerRunning: Boolean;
-
+var
+  TempComm: TIPCClientComm;
 begin
   If Assigned(FIPCComm) then
     Result:=FIPCComm.ServerRunning
   else
-    With CommClass.Create(Self) do
-      Try
-        Result:=ServerRunning;
-      finally
-        Free;
-      end;
+  begin
+    PrepareServerID;
+    TempComm := CommClass.Create(Self);
+    Try
+      Result := TempComm.ServerRunning;
+    finally
+      TempComm.Free;
+    end;
+  end;
 end;
 
 procedure TSimpleIPCClient.SendMessage(MsgType : TMessageType; Stream: TStream);
-
 begin
   CheckActive;
   FBusy:=True;
@@ -839,8 +1074,7 @@
   SendStringMessage(mtString,Msg);
 end;
 
-procedure TSimpleIPCClient.SendStringMessage(MsgType: TMessageType; const Msg: String
-  );
+procedure TSimpleIPCClient.SendStringMessage(MsgType: TMessageType; const Msg: String);
 Var
   S : TStringStream;
 begin
@@ -864,11 +1098,14 @@
   SendStringMessage(MsgType, Format(Msg,Args));
 end;
 
+{$ENDREGION}
+
 {$IFDEF OSNEEDIPCINITDONE}
 initialization
   IPCInit;
 finalization
   IPCDone;
-{$ENDIF}  
+{$ENDIF}
+
 end.
 
simpleipc.pp.patch (28,214 bytes)

Denis Kozlov

2017-07-10 00:45

reporter  

ThreadedIPC.zip (97,398 bytes)

Denis Kozlov

2017-07-10 00:46

reporter  

ThreadedIPCGUI.zip (861,345 bytes)

Michael Van Canneyt

2017-07-11 10:13

administrator   ~0101662

Thank you for the patch.

Unfortunately, I cannot commit it as-is. There are 2 things preventing this:

1. Your threadedipc program hangs every other run (1 out of 2).
2. the ipcserver program that is provided in fcl-process no longer works in threaded mode.

Tested on Linux.

Denis Kozlov

2017-08-10 12:06

reporter   ~0102131

I have changed the threaded implementation to align with your recommendations.

Several additional changes were necessary on top of my previous work:

1. Changed behavior of `PeekMessage` and `ReadMessage` methods of both IPC Server and IPC Driver to improve thread safety and isolation of responsibilities. See below for the new design specs.
2. Amiga IPC driver: Push messages into message queue, instead of attempting to overwrite the current message data in IPC server instance. I don't have Amiga, so this should be tested by somebody!
3. Unix IPC driver: PeekMessage only checks for messages, it no longer reads messages - an action which should be performed by a separate call to ReadMessage. This avoids the ambiguity of "who" reads the message and potential deadlocks.

New design specs for the key methods in IPC server and driver:

* `Driver.PeekMessage` - Check if a new message is available. May read and push new messages to the queue, depending on the platform/implementation.
* `Driver.ReadMessage` - Read and push a new message to the queue, ONLY IF `Driver.PeekMessage` has not already done so.
* `Server.PeekMessage` - Returns immediately if there is a queued message. Otherwise, wait until a new message is available via IPC driver. This pushes new messages to the queue and calls `OnMessageQueued` event. Optionally, calls `Server.ReadMessage` if a message is available in the queue.
* `Server.ReadMessage` - Pop a message from the queue and call `OnMessage` event.

I have identified one more significant issue with IPC drivers that needs to be addressed, but I'll discuss this separately because it is not directly connected to this work here.

In relation to the previous comments https://bugs.freepascal.org/view.php?id=31805#c101662 : Examples `ipcserver` and `ipcclient` now work unmodified, tested on Windows and Linux. My `threadedipc` program may still hand from time to time due to a limitation of the IPC driver on Linux and OS2 (the significant issue mentioned earlier).

Uploaded 2 new patches (date stamped) and updated `threadedipc`/`threadedipcgui` example programs (v2).

Denis Kozlov

2017-08-10 12:07

reporter  

2017-08-10-simpleipc.patch (31,427 bytes)
Index: packages/fcl-process/src/simpleipc.pp
===================================================================
--- packages/fcl-process/src/simpleipc.pp	(revision 36715)
+++ packages/fcl-process/src/simpleipc.pp	(working copy)
@@ -20,13 +20,12 @@
 interface
 
 uses
-  Contnrs, Classes, SysUtils;
+  Contnrs, SyncObjs, Classes, SysUtils;
 
-Const
+const
   MsgVersion = 1;
-  DefaultThreadTimeOut = 50;
 
-  //Message types
+  { IPC message types }
   mtUnknown = 0;
   mtString = 1;
 
@@ -33,13 +32,8 @@
 type
   TIPCMessageOverflowAction = (ipcmoaNone, ipcmoaDiscardOld, ipcmoaDiscardNew, ipcmoaError);
 
-var
-  DefaultIPCMessageOverflowAction: TIPCMessageOverflowAction = ipcmoaNone;
-  DefaultIPCMessageQueueLimit: Integer = 0;
+  TMessageType = LongInt;
 
-Type
-
-  TMessageType = LongInt;
   TMsgHeader = Packed record
     Version : Byte;
     MsgType : TMessageType;
@@ -49,17 +43,29 @@
   TSimpleIPCServer = class;
   TSimpleIPCClient = class;
 
+  { TIPCServerMsg }
   TIPCServerMsg = class
+  private type
+    TStreamClass = class of TStream;
+  private const
+    // TMemoryStream uses an effecient grow algorithm.
+    DefaultStreamClass: TStreamClass = TMemoryStream;
   strict private
     FStream: TStream;
+    FOwnsStream: Boolean;
     FMsgType: TMessageType;
+    function GetStringMessage: String;
   public
     constructor Create;
+    constructor Create(AStream: TStream; AOwnsStream: Boolean = True);
     destructor Destroy; override;
     property Stream: TStream read FStream;
     property MsgType: TMessageType read FMsgType write FMsgType;
+    property OwnsStream: Boolean read FOwnsStream write FOwnsStream;
+    property StringMessage: String read GetStringMessage;
   end;
 
+  { TIPCServerMsgQueue }
   TIPCServerMsgQueue = class
   strict private
     FList: TFPObjectList;
@@ -80,7 +86,6 @@
   end;
 
   { TIPCServerComm }
-  
   TIPCServerComm = Class(TObject)
   Private
     FOwner  : TSimpleIPCServer;
@@ -94,10 +99,10 @@
     Property Owner : TSimpleIPCServer read FOwner;
     Procedure StartServer; virtual; Abstract;
     Procedure StopServer;virtual; Abstract;
-    // May push messages on the queue
-    Function  PeekMessage(TimeOut : Integer) : Boolean;virtual; Abstract;
-    // Must put message on the queue.
-    Procedure ReadMessage ;virtual; Abstract;
+    // Check for new messages, may read and push messages to the queue.
+    Function PeekMessage(Timeout: Integer): Boolean; virtual; Abstract;
+    // Read and push new message to the queue, if not done by PeekMessage.
+    Procedure ReadMessage; virtual; Abstract;
     Property InstanceID : String read GetInstanceID;
   end;
   TIPCServerCommClass = Class of TIPCServerComm;
@@ -111,6 +116,7 @@
     FBusy: Boolean;
     FActive : Boolean;
     FServerID : String;
+    procedure PrepareServerID;
     Procedure DoError(const Msg: String; const Args: array of const);
     Procedure CheckInactive;
     Procedure CheckActive;
@@ -123,26 +129,44 @@
     Property ServerID : String Read FServerID Write SetServerID;
   end;
 
+  TMessageQueueEvent = Procedure(Sender: TObject; Msg: TIPCServerMsg) of object;
+
   { TSimpleIPCServer }
-
-  TMessageQueueEvent = Procedure(Sender : TObject; Msg : TIPCServerMsg) of object;
-
   TSimpleIPCServer = Class(TSimpleIPC)
-  protected
-  Private
+  private const
+    DefaultThreaded = False;
+    DefaultThreadTimeout = 50;
+    DefaultSynchronizeEvents = True;
+    DefaultMaxAction = ipcmoaNone;
+    DefaultMaxQueue = 0;
+  private
     FOnMessageError: TMessageQueueEvent;
     FOnMessageQueued: TNotifyEvent;
-    FQueue : TIPCServerMsgQueue;
+    FOnMessage: TNotifyEvent;
+    FOnThreadError: TNotifyEvent;
+    FQueue: TIPCServerMsgQueue;
+    FQueueLock: TCriticalSection;
+    FQueueAddEvent: TSimpleEvent;
     FGlobal: Boolean;
-    FOnMessage: TNotifyEvent;
-    FMsgType: TMessageType;
-    FMsgData : TStream;
-    FThreadTimeOut: Integer;
-    FThread : TThread;
-    FLock : TRTLCriticalSection;
-    FErrMsg : TIPCServerMsg;
-    procedure DoMessageQueued;
-    procedure DoMessageError;
+    // Access to the message is not locked by design!
+    // In the threaded mode, it must be accessed only during event callbacks.
+    FMessage: TIPCServerMsg;
+    FTempMessage: TIPCServerMsg;
+    FThreaded: Boolean;
+    FThreadTimeout: Integer;
+    FThreadError: String;
+    FThreadExecuting: Boolean;
+    FThreadReadyEvent: TSimpleEvent;
+    FThread: TThread;
+    FSynchronizeEvents: Boolean;
+    procedure DoOnMessage;
+    procedure DoOnMessageQueued;
+    procedure DoOnMessageError(Msg: TIPCServerMsg);
+    procedure DoOnThreadError;
+    procedure InternalDoOnMessage;
+    procedure InternalDoOnMessageQueued;
+    procedure InternalDoOnMessageError;
+    procedure InternalDoOnThreadError;
     function GetInstanceID: String;
     function GetMaxAction: TIPCMessageOverflowAction;
     function GetMaxQueue: Integer;
@@ -150,31 +174,43 @@
     procedure SetGlobal(const AValue: Boolean);
     procedure SetMaxAction(AValue: TIPCMessageOverflowAction);
     procedure SetMaxQueue(AValue: Integer);
-  Protected
+    procedure SetThreaded(AValue: Boolean);
+    procedure SetThreadTimeout(AValue: Integer);
+    procedure SetSynchronizeEvents(AValue: Boolean);
+    function WaitForReady(Timeout: Integer = -1): Boolean;
+    function GetMsgType: TMessageType;
+    function GetMsgData: TStream;
+  protected
     FIPCComm: TIPCServerComm;
-    procedure StartThread; virtual;
-    procedure StopThread; virtual;
     Function CommClass : TIPCServerCommClass; virtual;
     Procedure PushMessage(Msg : TIPCServerMsg); virtual;
     function PopMessage: Boolean; virtual;
+    procedure StartComm; virtual;
+    procedure StopComm; virtual;
+    function StartThread: Boolean; virtual;
+    procedure StopThread; virtual;
     Procedure Activate; override;
     Procedure Deactivate; override;
+    function ProcessMessage(Timeout: Integer): Boolean;
     Property Queue : TIPCServerMsgQueue Read FQueue;
     Property Thread : TThread Read FThread;
   Public
     Constructor Create(AOwner : TComponent); override;
     Destructor Destroy; override;
-    Procedure StartServer(Threaded : Boolean = False);
+    Procedure StartServer;
+    Procedure StartServer(AThreaded: Boolean);
     Procedure StopServer;
-    Function PeekMessage(TimeOut : Integer; DoReadMessage : Boolean): Boolean;
-    Procedure ReadMessage;
+    Function PeekMessage(Timeout: Integer; DoReadMessage: Boolean): Boolean;
+    Function ReadMessage: Boolean;
     Property  StringMessage : String Read GetStringMessage;
     Procedure GetMessageData(Stream : TStream);
-    Property  MsgType: TMessageType Read FMsgType;
-    Property  MsgData : TStream Read FMsgData;
+    Property  Message: TIPCServerMsg read FMessage;
+    Property  MsgType: TMessageType Read GetMsgType;
+    Property  MsgData: TStream Read GetMsgData;
     Property  InstanceID : String Read GetInstanceID;
+    property  ThreadExecuting: Boolean read FThreadExecuting;
+    property  ThreadError: String read FThreadError;
   Published
-    Property ThreadTimeOut : Integer Read FThreadTimeOut Write FThreadTimeOut;
     Property Global : Boolean Read FGlobal Write SetGlobal;
     // Called during ReadMessage
     Property OnMessage : TNotifyEvent Read FOnMessage Write FOnMessage;
@@ -182,14 +218,21 @@
     Property OnMessageQueued : TNotifyEvent Read FOnMessageQueued Write FOnMessageQueued;
     // Called when the queue overflows and  MaxAction = ipcmoaError.
     Property OnMessageError : TMessageQueueEvent Read FOnMessageError Write FOnMessageError;
+    // Called when the server thread catches an exception.
+    property OnThreadError: TNotifyEvent read FOnThreadError write FOnThreadError;
     // Maximum number of messages to keep in the queue
-    property MaxQueue: Integer read GetMaxQueue write SetMaxQueue;
+    property MaxQueue: Integer read GetMaxQueue write SetMaxQueue default DefaultMaxQueue;
     // What to do when the queue overflows
-    property MaxAction: TIPCMessageOverflowAction read GetMaxAction write SetMaxAction;
+    property MaxAction: TIPCMessageOverflowAction read GetMaxAction write SetMaxAction default DefaultMaxAction;
+    // Instruct IPC server to operate in a threaded mode.
+    property Threaded: Boolean read FThreaded write SetThreaded;
+    // Amount of time thread waits for a message before checking for termination.
+    property ThreadTimeout: Integer read FThreadTimeout write SetThreadTimeout default DefaultThreadTimeout;
+    // Synchronize events with the main thread when in threaded mode.
+    property SynchronizeEvents: Boolean read FSynchronizeEvents write SetSynchronizeEvents default DefaultSynchronizeEvents;
   end;
 
-
-  { TIPCClientComm}
+  { TIPCClientComm }
   TIPCClientComm = Class(TObject)
   private
     FOwner: TSimpleIPCClient;
@@ -229,17 +272,23 @@
     Property  ServerInstance : String Read FServerInstance Write SetServerInstance;
   end;
 
-
   EIPCError = Class(Exception);
 
-Var
+var
   DefaultIPCServerClass : TIPCServerCommClass = Nil;
   DefaultIPCClientClass : TIPCClientCommClass = Nil;
 
+var
+  DefaultIPCMessageOverflowAction: TIPCMessageOverflowAction = TSimpleIPCServer.DefaultMaxAction;
+  DefaultIPCMessageQueueLimit: Integer = TSimpleIPCServer.DefaultMaxQueue;
+
 resourcestring
   SErrServerNotActive = 'Server with ID %s is not active.';
   SErrActive = 'This operation is illegal when the server is active.';
   SErrInActive = 'This operation is illegal when the server is inactive.';
+  SErrThreadContext = 'This operation is illegal outside of IPC thread context.';
+  SErrThreadFailure = 'IPC thread failure.';
+  SErrMessageQueueOverflow = 'Message queue overflow (limit %s)';
 
 
 implementation
@@ -252,35 +301,108 @@
   This comes first, to allow the uses clause to be set.
   If the include file defines OSNEEDIPCINITDONE then the unit will
   call IPCInit and IPCDone in the initialization/finalization code.
-  
   --------------------------------------------------------------------- }
 {$UNDEF OSNEEDIPCINITDONE}
 
 {$i simpleipc.inc}
 
-Resourcestring
-  SErrMessageQueueOverflow = 'Message queue overflow (limit %s)';
+// Convert content of any stream type to a string.
+function FastStreamToString(Stream: TStream): String;
+var
+  CharCount, CharSize: Integer;
+  StringStream: TStringStream;
+  OldPosition: Int64;
+begin
+  // Optimized for TStringStream
+  if Stream is TStringStream then
+  begin
+    Result := TStringStream(Stream).DataString;
+  end
+  // Optimized for TCustomMemoryStream
+  else if Stream is TCustomMemoryStream then
+  begin
+    Result := '';
+    CharSize := StringElementSize(Result);
+    CharCount := Stream.Size div CharSize;
+    SetLength(Result, CharCount);
+    Move(TCustomMemoryStream(Stream).Memory^, Result[1], CharCount * CharSize);
+  end
+  // Any other stream type
+  else
+  begin
+    OldPosition := Stream.Position;
+    try
+      StringStream := TStringStream.Create('');
+      try
+        Stream.Position := 0;
+        StringStream.CopyFrom(Stream, Stream.Size);
+        Result := StringStream.DataString;
+      finally
+        StringStream.Free;
+      end;
+    finally
+      Stream.Position := OldPosition;
+    end;
+  end;
+end;
 
-{ ---------------------------------------------------------------------
-    TIPCServerMsg
-  ---------------------------------------------------------------------}
+// Timeout values:
+//   >  0  -- Number of milliseconds to wait
+//   =  0  -- return immediately
+//   = -1  -- wait infinitely (converted to INFINITE)
+//   < -1  -- wait infinitely (converted to INFINITE)
+function IPCTimeoutToEventTimeout(Timeout: Integer): Cardinal; inline;
+begin
+  if Timeout >= 0 then
+    Result := Timeout
+  else
+    Result := SyncObjs.INFINITE;
+end;
 
+// Timeout values:
+//   >  0  -- Number of milliseconds to wait
+//   =  0  -- return immediately
+//   = -1  -- wait infinitely
+//   < -1  -- wait infinitely (force to -1)
+function IPCTimeoutSanitized(Timeout: Integer): Integer; inline;
+begin
+  if Timeout >= 0 then
+    Result := Timeout
+  else
+    Result := -1;
+end;
 
+{$REGION 'TIPCServerMsg'}
+
 constructor TIPCServerMsg.Create;
 begin
-  FMsgType := 0;
-  FStream := TMemoryStream.Create;
+  FMsgType := mtUnknown;
+  FStream := Self.DefaultStreamClass.Create;
+  FOwnsStream := True;
 end;
 
+constructor TIPCServerMsg.Create(AStream: TStream; AOwnsStream: Boolean);
+begin
+  FMsgType := mtUnknown;
+  FStream := AStream;
+  FOwnsStream := AOwnsStream;
+end;
+
 destructor TIPCServerMsg.Destroy;
 begin
-  FStream.Free;
+  if FOwnsStream then
+    FreeAndNil(FStream);
 end;
 
-{ ---------------------------------------------------------------------
-    TIPCServerMsgQueue
-  ---------------------------------------------------------------------}
+function TIPCServerMsg.GetStringMessage: String;
+begin
+  Result := FastStreamToString(FStream);
+end;
 
+{$ENDREGION}
+
+{$REGION 'TIPCServerMsgQueue'}
+
 constructor TIPCServerMsgQueue.Create;
 begin
   FMaxCount := DefaultIPCMessageQueueLimit;
@@ -336,6 +458,7 @@
 
 procedure TIPCServerMsgQueue.Push(AItem: TIPCServerMsg);
 begin
+  // PrepareToPush may throw an exception, e.g. if message queue is full.
   if PrepareToPush then
     FList.Insert(0, AItem);
 end;
@@ -355,10 +478,9 @@
     Result := nil;
 end;
 
+{$ENDREGION}
 
-{ ---------------------------------------------------------------------
-    TIPCServerComm
-  ---------------------------------------------------------------------}
+{$REGION 'TIPCServerComm'}
 
 constructor TIPCServerComm.Create(AOwner: TSimpleIPCServer);
 begin
@@ -366,16 +488,13 @@
 end;
 
 procedure TIPCServerComm.DoError(const Msg: String; const Args: array of const);
-
 begin
   FOwner.DoError(Msg,Args);
 end;
 
 procedure TIPCServerComm.PushMessage(const Hdr: TMsgHeader; AStream: TStream);
-
-Var
+var
   M : TIPCServerMsg;
-
 begin
   M:=TIPCServerMsg.Create;
   try
@@ -394,9 +513,9 @@
   FOwner.PushMessage(Msg);
 end;
 
-{ ---------------------------------------------------------------------
-    TIPCClientComm
-  ---------------------------------------------------------------------}
+{$ENDREGION}
+
+{$REGION 'TIPCClientComm'}
   
 constructor TIPCClientComm.Create(AOwner: TSimpleIPCClient);
 begin
@@ -409,10 +528,10 @@
   FOwner.DoError(Msg,Args);
 end;  
 
-{ ---------------------------------------------------------------------
-    TSimpleIPC
-  ---------------------------------------------------------------------}
+{$ENDREGION}
 
+{$REGION 'TSimpleIPC'}
+
 Procedure TSimpleIPC.DoError(const Msg: String; const Args: array of const);
 var
   FullMsg: String;
@@ -441,7 +560,7 @@
 procedure TSimpleIPC.SetActive(const AValue: Boolean);
 begin
   if (FActive<>AValue) then
-    begin
+  begin
     if ([]<>([csLoading,csDesigning]*ComponentState)) then
       FActive:=AValue
     else  
@@ -449,37 +568,90 @@
         Activate
       else
         Deactivate;
-    end;
+  end;
 end;
 
 procedure TSimpleIPC.SetServerID(const AValue: String);
 begin
   if (FServerID<>AValue) then
-    begin
+  begin
     CheckInactive;
-    FServerID:=AValue
-    end;
+    FServerID:=AValue;
+  end;
 end;
 
-Procedure TSimpleIPC.Loaded; 
+procedure TSimpleIPC.PrepareServerID;
+begin
+  if FServerID = '' then
+    FServerID := ApplicationName;
+  // Extra precaution for thread-safety
+  UniqueString(FServerID);
+end;
 
-Var
+procedure TSimpleIPC.Loaded;
+var
   B : Boolean;
-
 begin
-  Inherited;
+  inherited;
   B:=FActive;
   if B then
-    begin
-    Factive:=False;
+  begin
+    FActive:=False;
     Activate;
+  end;
+end;
+
+{$ENDREGION}
+
+{$REGION 'TIPCServerThread'}
+
+type
+  TIPCServerThread = class(TThread)
+  private
+    FServer: TSimpleIPCServer;
+  protected
+    procedure Execute; override;
+  public
+    constructor Create(AServer: TSimpleIPCServer);
+    property Server: TSimpleIPCServer read FServer;
+  end;
+
+constructor TIPCServerThread.Create(AServer: TSimpleIPCServer);
+begin
+  inherited Create(True); // CreateSuspended = True
+  FServer := AServer;
+end;
+
+procedure TIPCServerThread.Execute;
+begin
+  FServer.FThreadExecuting := True;
+  try
+    FServer.StartComm;
+    try
+      // Notify server that thread has started.
+      FServer.FThreadReadyEvent.SetEvent;
+      // Run message loop
+      while not Terminated do
+        FServer.ProcessMessage(FServer.ThreadTimeout);
+    finally
+      FServer.StopComm;
     end;
+  except on E: Exception do
+    begin
+      FServer.FThreadExecuting := False;
+      FServer.FThreadError := E.Message;
+      // Trigger event to wake up the caller from potentially indefinite wait.
+      FServer.FThreadReadyEvent.SetEvent;
+      FServer.DoOnThreadError;
+    end;
+  end;
+  FServer.FThreadExecuting := False;
 end;
 
-{ ---------------------------------------------------------------------
-    TSimpleIPCServer
-  ---------------------------------------------------------------------}
+{$ENDREGION}
 
+{$REGION 'TSimpleIPCServer'}
+
 constructor TSimpleIPCServer.Create(AOwner: TComponent);
 begin
   inherited Create(AOwner);
@@ -486,9 +658,11 @@
   FGlobal:=False;
   FActive:=False;
   FBusy:=False;
-  FMsgData:=TStringStream.Create('');
+  FMessage:=nil;
   FQueue:=TIPCServerMsgQueue.Create;
-  FThreadTimeOut:=DefaultThreadTimeOut;
+  FThreaded:=DefaultThreaded;
+  FThreadTimeout:=DefaultThreadTimeout;
+  FSynchronizeEvents:=DefaultSynchronizeEvents;
 end;
 
 destructor TSimpleIPCServer.Destroy;
@@ -495,26 +669,44 @@
 begin
   Active:=False;
   FreeAndNil(FQueue);
-  FreeAndNil(FMsgData);
+  if Assigned(FMessage) then
+    FreeAndNil(FMessage);
   inherited Destroy;
 end;
 
 procedure TSimpleIPCServer.SetGlobal(const AValue: Boolean);
 begin
-  if (FGlobal<>AValue) then
-    begin
-    CheckInactive;
-    FGlobal:=AValue;
-    end;
+  CheckInactive;
+  FGlobal:=AValue;
 end;
 
+procedure TSimpleIPCServer.SetThreaded(AValue: Boolean);
+begin
+  CheckInactive;
+  FThreaded:=AValue;
+end;
+
+procedure TSimpleIPCServer.SetThreadTimeout(AValue: Integer);
+begin
+  CheckInactive;
+  FThreadTimeout:=AValue;
+end;
+
+procedure TSimpleIPCServer.SetSynchronizeEvents(AValue: Boolean);
+begin
+  CheckInactive;
+  FSynchronizeEvents:=AValue;
+end;
+
 procedure TSimpleIPCServer.SetMaxAction(AValue: TIPCMessageOverflowAction);
 begin
+  CheckInactive;
   FQueue.MaxAction:=AValue;
 end;
 
 procedure TSimpleIPCServer.SetMaxQueue(AValue: Integer);
 begin
+  CheckInactive;
   FQueue.MaxCount:=AValue;
 end;
 
@@ -533,163 +725,231 @@
   Result:=FQueue.MaxCount;
 end;
 
+procedure TSimpleIPCServer.StartComm;
+begin
+  if Assigned(FIPCComm) then
+    FreeAndNil(FIPCComm);
+  FIPCComm := CommClass.Create(Self);
+  FIPCComm.StartServer;
+end;
 
-function TSimpleIPCServer.GetStringMessage: String;
+procedure TSimpleIPCServer.StopComm;
 begin
-  Result:=TStringStream(FMsgData).DataString;
+  if Assigned(FIPCComm) then
+  begin
+    FIPCComm.StopServer;
+    FreeAndNil(FIPCComm);
+  end;
 end;
 
-
-procedure TSimpleIPCServer.StartServer(Threaded : Boolean = False);
+function TSimpleIPCServer.StartThread: Boolean;
 begin
-  if Not Assigned(FIPCComm) then
-    begin
-    If (FServerID='') then
-      FServerID:=ApplicationName;
-    FIPCComm:=CommClass.Create(Self);
-    FIPCComm.StartServer;
-    end;
-  FActive:=True;
-  If Threaded then
-    StartThread;
+  FThreadError := '';
+  FQueueLock := SyncObjs.TCriticalSection.Create;
+  FQueueAddEvent := SyncObjs.TSimpleEvent.Create;
+  FThreadReadyEvent := SyncObjs.TSimpleEvent.Create;
+  FThread := TIPCServerThread.Create(Self);
+  FThread.Start;
+  Result := WaitForReady;
 end;
 
-Type
-
-  { TServerThread }
-
-  TServerThread = Class(TThread)
-  private
-    FServer: TSimpleIPCServer;
-    FThreadTimeout: Integer;
-  Public
-    Constructor Create(AServer : TSimpleIPCServer; ATimeout : integer);
-    procedure Execute; override;
-    Property Server : TSimpleIPCServer Read FServer;
-    Property ThreadTimeout : Integer Read FThreadTimeout;
+procedure TSimpleIPCServer.StopThread;
+begin
+  if Assigned(FThread) then
+  begin
+    FThread.Terminate;
+    FThread.WaitFor;
+    FreeAndNil(FThread);
   end;
-
-{ TServerThread }
-
-constructor TServerThread.Create(AServer: TSimpleIPCServer; ATimeout: integer);
-begin
-  FServer:=AServer;
-  FThreadTimeout:=ATimeOut;
-  Inherited Create(False);
+  if Assigned(FThreadReadyEvent) then
+    FreeAndNil(FThreadReadyEvent);
+  if Assigned(FQueueAddEvent) then
+    FreeAndNil(FQueueAddEvent);
+  if Assigned(FQueueLock) then
+    FreeAndNil(FQueueLock);
 end;
 
-procedure TServerThread.Execute;
+function TSimpleIPCServer.WaitForReady(Timeout: Integer = -1): Boolean;
 begin
-  While Not Terminated do
-    FServer.PeekMessage(ThreadTimeout,False);
+  if FThreadReadyEvent.WaitFor(IPCTimeoutToEventTimeout(Timeout)) = wrSignaled then
+    Result := FThreadExecuting
+  else
+    Result := False;
 end;
 
-procedure TSimpleIPCServer.StartThread;
-
+procedure TSimpleIPCServer.StartServer;
 begin
-  InitCriticalSection(FLock);
-  FThread:=TServerThread.Create(Self,ThreadTimeOut);
+  StartServer(FThreaded);
 end;
 
-procedure TSimpleIPCServer.StopThread;
-
+procedure TSimpleIPCServer.StartServer(AThreaded: Boolean);
 begin
-  if Assigned(FThread) then
+  CheckInactive;
+  FActive := True;
+  try
+    PrepareServerID;
+    FThreaded := AThreaded;
+    if FThreaded then
     begin
-    FThread.Terminate;
-    FThread.WaitFor;
-    FreeAndNil(FThread);
-    DoneCriticalSection(FLock);
-    end;
+      if not StartThread then
+        raise EIPCError.Create(SErrThreadFailure);
+    end
+    else
+      StartComm;
+  except
+    FActive := False;
+    raise;
+  end;
 end;
 
 procedure TSimpleIPCServer.StopServer;
 begin
   StopThread;
-  If Assigned(FIPCComm) then
-    begin
-    FIPCComm.StopServer;
-    FreeAndNil(FIPCComm);
-    end;
+  StopComm;
   FQueue.Clear;
-  FActive:=False;
+  FActive := False;
 end;
 
-// TimeOut values:
+function TSimpleIPCServer.ProcessMessage(Timeout: Integer): Boolean;
+begin
+  FBusy := True;
+  try
+    // Check for new messages (may push several messages to the queue)
+    Result := FIPCComm.PeekMessage(IPCTimeoutSanitized(Timeout));
+    // Push new message to the queue (explicitly)
+    if Result then
+      FIPCComm.ReadMessage;
+  finally
+    FBusy := False;
+  end;
+end;
+
+// Timeout values:
 //   >  0  -- Number of milliseconds to wait
 //   =  0  -- return immediately
 //   = -1  -- wait infinitely
 //   < -1  -- wait infinitely (force to -1)
-function TSimpleIPCServer.PeekMessage(TimeOut: Integer; DoReadMessage: Boolean): Boolean;
+function TSimpleIPCServer.PeekMessage(Timeout: Integer; DoReadMessage: Boolean): Boolean;
 begin
   CheckActive;
-  Result:=Queue.Count>0;
-  If Not Result then
-    begin
-    if TimeOut < -1 then
-      TimeOut := -1;
-    FBusy:=True;
-    Try
-      Result:=FIPCComm.PeekMessage(Timeout);
-    Finally
-      FBusy:=False;
+
+  if Threaded then
+  begin
+    // Check if have messages in the queue
+    FQueueLock.Acquire;
+    try
+      Result:=FQueue.Count>0;
+      // Reset queue add event
+      if not Result then
+        FQueueAddEvent.ResetEvent;
+    finally
+      FQueueLock.Release;
     end;
-    end;
+    // Wait for queue add event
+    if not Result and (Timeout <> 0) then
+      Result := FQueueAddEvent.WaitFor(IPCTimeoutToEventTimeout(Timeout)) = wrSignaled;
+  end
+  else
+  begin
+    // Check if have messages in the queue
+    Result:=FQueue.Count>0;
+    // If queue is empty, process new messages via IPC driver
+    if not Result then
+      Result := ProcessMessage(Timeout);
+  end;
+
+  // Read message if available (be aware of a race condition in threaded mode)
   If Result then
     If DoReadMessage then
-      Readmessage;
+      ReadMessage;
 end;
 
+function TSimpleIPCServer.ReadMessage: Boolean;
+begin
+  // Pop a message from the queue
+  Result := PopMessage;
+  if Result then
+    DoOnMessage;
+end;
+
 function TSimpleIPCServer.PopMessage: Boolean;
+begin
+  if Threaded then
+    FQueueLock.Acquire;
+  try
+    if Assigned(FMessage) then
+      FreeAndNil(FMessage);
+    FMessage := FQueue.Pop;
+    Result := Assigned(FMessage);
+  finally
+    if Threaded then
+      FQueueLock.Release;
+  end;
+end;
 
+procedure TSimpleIPCServer.PushMessage(Msg: TIPCServerMsg);
 var
-  MsgItem: TIPCServerMsg;
-  DoLock : Boolean;
-
+  PushFailed: Boolean;
 begin
-  DoLock:=Assigned(FThread);
-  if DoLock then
-    EnterCriticalsection(Flock);
+  if Threaded then
+    FQueueLock.Acquire;
   try
-    MsgItem:=FQueue.Pop;
+    PushFailed := False;
+    try
+      // Queue.Push may throw an exception, e.g. if message queue is full.
+      FQueue.Push(Msg);
+    except
+      PushFailed := True;
+    end;
+    // Notify a waiting PeekMessage in threaded mode
+    if Threaded and not PushFailed then
+      FQueueAddEvent.SetEvent;
   finally
-    if DoLock then
-      LeaveCriticalsection(FLock);
+    if Threaded then
+      FQueueLock.Release;
   end;
-  Result:=Assigned(MsgItem);
-  if Result then
-    try
-      FMsgType := MsgItem.MsgType;
-      MsgItem.Stream.Position := 0;
-      FMsgData.Size := 0;
-      FMsgData.CopyFrom(MsgItem.Stream, MsgItem.Stream.Size);
-    finally
-      MsgItem.Free;
-    end;
+
+  if PushFailed then
+    // Handler must free the Msg, because it is not owned by anybody.
+    DoOnMessageError(Msg)
+  else
+    DoOnMessageQueued;
 end;
 
-procedure TSimpleIPCServer.ReadMessage;
+function TSimpleIPCServer.GetMsgType: TMessageType;
+begin
+  // Access to the message is not locked by design!
+  if Assigned(FMessage) then
+    Result := FMessage.MsgType
+  else
+    Result := mtUnknown;
+end;
 
+function TSimpleIPCServer.GetMsgData: TStream;
 begin
-  CheckActive;
-  FBusy:=True;
-  Try
-    if (FQueue.Count=0) then
-      // Readmessage pushes a message to the queue
-      FIPCComm.ReadMessage;
-    if PopMessage then
-      If Assigned(FOnMessage) then
-        FOnMessage(Self);
-  Finally
-    FBusy:=False;
-  end;
+  // Access to the message is not locked by design!
+  if Assigned(FMessage) then
+    Result := FMessage.Stream
+  else
+    Result := nil;
 end;
 
 procedure TSimpleIPCServer.GetMessageData(Stream: TStream);
 begin
-  Stream.CopyFrom(FMsgData,0);
+  // Access to the message is not locked by design!
+  if Assigned(FMessage) then
+    Stream.CopyFrom(FMessage.Stream, 0);
 end;
 
+function TSimpleIPCServer.GetStringMessage: String;
+begin
+  // Access to the message is not locked by design!
+  if Assigned(FMessage) then
+    Result := FMessage.StringMessage
+  else
+    Result := '';
+end;
+
 procedure TSimpleIPCServer.Activate;
 begin
   StartServer;
@@ -700,62 +960,87 @@
   StopServer;
 end;
 
+procedure TSimpleIPCServer.DoOnMessage;
+begin
+  if Assigned(FOnMessage) then
+  begin
+    if FSynchronizeEvents and Assigned(FThread) then
+      TThread.Synchronize(FThread, @InternalDoOnMessage)
+    else
+      InternalDoOnMessage;
+  end;
+end;
 
-procedure TSimpleIPCServer.DoMessageQueued;
+procedure TSimpleIPCServer.InternalDoOnMessage;
+begin
+  if Assigned(FOnMessage) then
+    FOnMessage(Self);
+end;
 
+procedure TSimpleIPCServer.DoOnMessageQueued;
 begin
   if Assigned(FOnMessageQueued) then
+  begin
+    if FSynchronizeEvents and Assigned(FThread) then
+      TThread.Synchronize(FThread, @InternalDoOnMessageQueued)
+    else
+      InternalDoOnMessageQueued;
+  end;
+end;
+
+procedure TSimpleIPCServer.InternalDoOnMessageQueued;
+begin
+  if Assigned(FOnMessageQueued) then
     FOnMessageQueued(Self);
 end;
 
-procedure TSimpleIPCServer.DoMessageError;
+procedure TSimpleIPCServer.DoOnMessageError(Msg: TIPCServerMsg);
 begin
   try
-    if Assigned(FOnMessageQueued) then
-      FOnMessageError(Self,FErrMsg);
+    if Assigned(FOnMessageError) then
+    begin
+      // Temp message (class instance variable) is used to pass
+      // a parameter to a synchronized thread method.
+      FTempMessage := Msg;
+      if FSynchronizeEvents and Assigned(FThread) then
+        TThread.Synchronize(FThread, @InternalDoOnMessageError)
+      else
+        InternalDoOnMessageError;
+    end;
   finally
-    FreeAndNil(FErrMsg)
+    // Must free the message because it is not owned by anybody.
+    FTempMessage := nil;
+    FreeAndNil(Msg);
   end;
 end;
 
-procedure TSimpleIPCServer.PushMessage(Msg: TIPCServerMsg);
+procedure TSimpleIPCServer.InternalDoOnMessageError;
+begin
+  if Assigned(FOnMessageError) then
+    FOnMessageError(Self, FTempMessage);
+end;
 
-Var
-  DoLock : Boolean;
-
+procedure TSimpleIPCServer.DoOnThreadError;
 begin
-  try
-    DoLock:=Assigned(FThread);
-    If DoLock then
-      EnterCriticalsection(FLock);
-    try
-      Queue.Push(Msg);
-    finally
-      If DoLock then
-        LeaveCriticalsection(FLock);
-    end;
-    if DoLock then
-      TThread.Synchronize(FThread,@DoMessageQueued)
+  if Assigned(FOnThreadError) then
+  begin
+    if FSynchronizeEvents and Assigned(FThread) then
+      TThread.Synchronize(FThread, @InternalDoOnThreadError)
     else
-      DoMessageQueued;
-  except
-    On E : Exception do
-      FErrMsg:=Msg;
+      InternalDoOnThreadError;
   end;
-  if Assigned(FErrMsg) then
-    if DoLock then
-      TThread.Synchronize(FThread,@DoMessageError)
-    else
-      DoMessageQueued;
+end;
 
+procedure TSimpleIPCServer.InternalDoOnThreadError;
+begin
+  if Assigned(FOnThreadError) then
+    FOnThreadError(Self);
 end;
 
+{$ENDREGION}
 
+{$REGION 'TSimpleIPCClient'}
 
-{ ---------------------------------------------------------------------
-    TSimpleIPCClient
-  ---------------------------------------------------------------------}
-
 procedure TSimpleIPCClient.SetServerInstance(const AValue: String);
 begin
   CheckInactive;
@@ -776,7 +1061,7 @@
   inherited Create(AOwner);
 end;
 
-destructor TSimpleIPCClient.destroy;
+destructor TSimpleIPCClient.Destroy;
 begin
   Active:=False;
   Inherited;
@@ -785,7 +1070,8 @@
 procedure TSimpleIPCClient.Connect;
 begin
   If Not assigned(FIPCComm) then
-    begin
+  begin
+    PrepareServerID;
     FIPCComm:=CommClass.Create(Self);
     Try
       FIPCComm.Connect;
@@ -794,7 +1080,7 @@
       Raise;
     end;  
     FActive:=True;
-    end;
+  end;
 end;
 
 procedure TSimpleIPCClient.Disconnect;
@@ -809,21 +1095,24 @@
 end;
 
 function TSimpleIPCClient.ServerRunning: Boolean;
-
+var
+  TempComm: TIPCClientComm;
 begin
   If Assigned(FIPCComm) then
     Result:=FIPCComm.ServerRunning
   else
-    With CommClass.Create(Self) do
-      Try
-        Result:=ServerRunning;
-      finally
-        Free;
-      end;
+  begin
+    PrepareServerID;
+    TempComm := CommClass.Create(Self);
+    Try
+      Result := TempComm.ServerRunning;
+    finally
+      TempComm.Free;
+    end;
+  end;
 end;
 
 procedure TSimpleIPCClient.SendMessage(MsgType : TMessageType; Stream: TStream);
-
 begin
   CheckActive;
   FBusy:=True;
@@ -839,8 +1128,7 @@
   SendStringMessage(mtString,Msg);
 end;
 
-procedure TSimpleIPCClient.SendStringMessage(MsgType: TMessageType; const Msg: String
-  );
+procedure TSimpleIPCClient.SendStringMessage(MsgType: TMessageType; const Msg: String);
 Var
   S : TStringStream;
 begin
@@ -864,11 +1152,14 @@
   SendStringMessage(MsgType, Format(Msg,Args));
 end;
 
+{$ENDREGION}
+
 {$IFDEF OSNEEDIPCINITDONE}
 initialization
   IPCInit;
 finalization
   IPCDone;
-{$ENDIF}  
+{$ENDIF}
+
 end.
 

Denis Kozlov

2017-08-10 12:07

reporter  

2017-08-10-ipcdrivers.patch (2,457 bytes)
Index: packages/fcl-process/src/amicommon/simpleipc.inc
===================================================================
--- packages/fcl-process/src/amicommon/simpleipc.inc	(revision 36715)
+++ packages/fcl-process/src/amicommon/simpleipc.inc	(working copy)
@@ -234,6 +234,7 @@
 var
   Temp: PByte;
   MsgType: TMessageType;
+  Msg: TIPCServerMsg;
 begin
   if Assigned(MsgBody) then
   begin
@@ -241,11 +242,18 @@
     Inc(Temp, SizeOf(Exec.TMessage));
     MsgType := 0;
     Move(Temp^, MsgType, SizeOf(TMessageType));
-    Inc(Temp, SizeOf(TMessageType));    
-    Owner.FMsgType := MsgType;
-    Owner.FMsgData.Size := 0;
-    Owner.FMsgData.Seek(0, soFrombeginning);
-    Owner.FMsgData.WriteBuffer(temp^, MsgBody^.mn_Length);
+    Inc(Temp, SizeOf(TMessageType));
+
+    Msg := TIPCServerMsg.Create;
+    try
+      Msg.MsgType := MsgType;
+      Msg.Stream.WriteBuffer(Temp^, MsgBody^.mn_Length);
+    except
+      FreeAndNil(Msg);
+      raise;
+    end;
+    PushMessage(Msg);
+
     System.FreeMem(MsgBody);
     MsgBody := nil;
   end;
Index: packages/fcl-process/src/unix/simpleipc.inc
===================================================================
--- packages/fcl-process/src/unix/simpleipc.inc	(revision 36715)
+++ packages/fcl-process/src/unix/simpleipc.inc	(working copy)
@@ -131,8 +131,6 @@
   Private
     FFileName: String;
     FStream: TFileStream;
-  Protected
-    Procedure DoReadMessage; virtual;
   Public
     Constructor Create(AOWner : TSimpleIPCServer); override;
     Procedure StartServer; override;
@@ -144,16 +142,7 @@
     Property Stream : TFileStream Read FStream;
   end;
 
-procedure TPipeServerComm.DoReadMessage;
 
-Var
-  Hdr : TMsgHeader;
-
-begin
-  FStream.ReadBuffer(Hdr,SizeOf(Hdr));
-  PushMessage(Hdr,FStream);
-end;
-
 constructor TPipeServerComm.Create(AOWner: TSimpleIPCServer);
 begin
   inherited Create(AOWner);
@@ -187,25 +176,20 @@
 end;
 
 function TPipeServerComm.PeekMessage(TimeOut: Integer): Boolean;
-
 Var
   FDS : TFDSet;
-
 begin
   fpfd_zero(FDS);
   fpfd_set(FStream.Handle,FDS);
-  Result:=False;
-  While fpSelect(FStream.Handle+1,@FDS,Nil,Nil,TimeOut)>0 do
-    begin
-    DoReadMessage;
-    Result:=True;
-    end;
+  Result := fpSelect(FStream.Handle+1,@FDS,Nil,Nil,TimeOut)>0;
 end;
 
 procedure TPipeServerComm.ReadMessage;
-
+Var
+  Hdr : TMsgHeader;
 begin
-  DoReadMessage;
+  FStream.ReadBuffer(Hdr,SizeOf(Hdr));
+  PushMessage(Hdr,FStream);
 end;
 
 

Denis Kozlov

2017-08-10 12:07

reporter  

ThreadedIPC-v2.zip (1,838 bytes)

Denis Kozlov

2017-08-10 12:07

reporter  

ThreadedIPCGUI-v2.zip (128,723 bytes)

Michael Van Canneyt

2017-08-15 11:55

administrator   ~0102180

Applied the patch from Denis. Many many thanks !

Issue History

Date Modified Username Field Change
2017-05-12 20:20 Lars(L505) New Issue
2017-05-15 15:26 Michael Van Canneyt Assigned To => Michael Van Canneyt
2017-05-15 15:26 Michael Van Canneyt Status new => assigned
2017-05-15 21:05 Lars(L505) Note Added: 0100325
2017-05-15 21:07 Lars(L505) Note Edited: 0100325 View Revisions
2017-05-15 21:10 Lars(L505) Note Edited: 0100325 View Revisions
2017-05-16 09:08 Thaddy de Koning Note Added: 0100329
2017-05-16 09:17 Michael Van Canneyt Note Added: 0100331
2017-05-16 14:07 Michael Van Canneyt File Added: bugdemo.zip
2017-05-16 15:26 Michael Van Canneyt Note Added: 0100335
2017-05-18 00:57 Lars(L505) Note Added: 0100379
2017-05-18 00:58 Lars(L505) Note Edited: 0100379 View Revisions
2017-05-18 01:38 Lars(L505) Note Added: 0100381
2017-05-18 01:40 Lars(L505) Note Edited: 0100381 View Revisions
2017-05-18 01:41 Lars(L505) Note Edited: 0100381 View Revisions
2017-05-18 07:40 Michael Van Canneyt Note Added: 0100384
2017-05-19 18:26 Denis Kozlov Note Added: 0100498
2017-05-19 22:23 Denis Kozlov Note Added: 0100513
2017-05-20 13:36 Michael Van Canneyt Note Added: 0100540
2017-05-20 21:24 Denis Kozlov Note Added: 0100547
2017-06-03 07:18 Michael Van Canneyt Note Added: 0100813
2017-06-04 17:21 Marco van de Voort Note Added: 0100858
2017-06-08 02:45 Denis Kozlov Note Added: 0100943
2017-06-08 11:24 Thaddy de Koning Note Added: 0100945
2017-06-08 11:27 Thaddy de Koning Note Edited: 0100945 View Revisions
2017-06-10 21:10 Denis Kozlov Note Added: 0101009
2017-06-10 22:05 Michael Van Canneyt Note Added: 0101010
2017-06-13 14:41 Denis Kozlov Note Added: 0101092
2017-06-13 15:58 Michael Van Canneyt Note Added: 0101095
2017-06-20 18:17 Denis Kozlov Note Added: 0101232
2017-07-10 00:44 Denis Kozlov Note Added: 0101645
2017-07-10 00:45 Denis Kozlov File Added: simpleipc.pp.patch
2017-07-10 00:45 Denis Kozlov File Added: ThreadedIPC.zip
2017-07-10 00:46 Denis Kozlov File Added: ThreadedIPCGUI.zip
2017-07-10 00:46 Denis Kozlov Tag Attached: patch
2017-07-11 10:13 Michael Van Canneyt Note Added: 0101662
2017-07-11 10:13 Michael Van Canneyt Status assigned => feedback
2017-08-10 12:06 Denis Kozlov Note Added: 0102131
2017-08-10 12:07 Denis Kozlov File Added: 2017-08-10-simpleipc.patch
2017-08-10 12:07 Denis Kozlov File Added: 2017-08-10-ipcdrivers.patch
2017-08-10 12:07 Denis Kozlov File Added: ThreadedIPC-v2.zip
2017-08-10 12:07 Denis Kozlov File Added: ThreadedIPCGUI-v2.zip
2017-08-15 11:55 Michael Van Canneyt Fixed in Revision => 36915
2017-08-15 11:55 Michael Van Canneyt Note Added: 0102180
2017-08-15 11:55 Michael Van Canneyt Status feedback => resolved
2017-08-15 11:55 Michael Van Canneyt Fixed in Version => 3.1.1
2017-08-15 11:55 Michael Van Canneyt Resolution open => fixed
2017-08-15 11:55 Michael Van Canneyt Target Version => 3.2.0
2018-02-10 01:08 Michael Van Canneyt Relationship added related to 0033130