unit BoundedBuf;

{Martin Harvey 24/4/2000}

interface

uses Windows, SysUtils;

const
  DefaultWaitTime = 5000; { Five second wait on mutexes }

type
  { I don't particularly like dynamic arrays, so I'm going to do things
    the "C" way here, explicitly allocating memory
    Think of TBufferEntries as ^(array of pointer) }

  TBufferEntries = ^Pointer;

  TBoundedBuffer = class
  private
    FBufInit: boolean;
    FBufSize: integer;
    FBuf: TBufferEntries;
    FReadPtr, { ReadPtr points to next used entry in buffer}
    FWritePtr: integer; { WritePtr points to next free entry in buffer}
    FEntriesFree, FEntriesUsed: THandle; { Flow control semaphores }
    FCriticalMutex: THandle; { Critical section mutex }
  protected
    procedure SetSize(NewSize: integer);
  public
    procedure ResetState;
    destructor Destroy; override;
    function PutItem(NewItem: Pointer): boolean;
    function GetItem: Pointer;
  published
    property Size: integer read FBufSize write SetSize;
  end;

  { No constructor required because default values of 0, false etc acceptable }

implementation

const
  FailMsg1 = 'Flow control failed, or buffer not initialised';
  FailMsg2 = 'Critical section failed, or buffer not initialised';

procedure TBoundedBuffer.SetSize(NewSize: integer);

{ Initialises handles and allocates memory.
  If the buffer size has previously been set, then this may invoke a buffer
  reset }

begin
  if FBufInit then ResetState;
  if NewSize < 2 then NewSize := 2;
  FBufSize := NewSize;
  GetMem(FBuf, Sizeof(Pointer) * FBufSize);
  FillMemory(FBuf, Sizeof(Pointer) * FBufSize, 0);
  FBufInit := true;
  FCriticalMutex := CreateMutex(nil, false, nil); { note lack of name }
  { The initial count on the semaphores requires some thought,
    The maximum count requires more thought.
    Again, all synchronisation objects are anonymous }
  FEntriesFree := CreateSemaphore(nil, FBufSize - 1, FBufSize, nil);
  FEntriesUsed := CreateSemaphore(nil, 0, FBufSize, nil);
  if (FCriticalMutex = 0)
    or (FEntriesFree = 0)
    or (FEntriesUsed = 0) then ResetState
end;

procedure TBoundedBuffer.ResetState;

{ Closes handles and deallocates memory.
  Note that this must unblock threads in such a manner that they quit cleanly }

begin
  if FBufInit then
  begin
    WaitForSingleObject(FCriticalMutex, DefaultWaitTime);
    FBufInit := false;
    FBufSize := 0;
    FreeMem(FBuf);
    { Now wake up all threads currently waiting.
      Currently assumes only 1 producer and 1 consumer.
      Plenty of ordering subtleties and pitfalls to be discussed here }
    ReleaseSemaphore(FEntriesFree, 1, nil);
    ReleaseSemaphore(FEntriesUsed, 1, nil);
    CloseHandle(FEntriesFree);
    CloseHandle(FEntriesUsed);
    { If reader or writer threads are waiting,
      then they will be waiting on the mutex.
      We will close the handle and let them time out }
    CloseHandle(FCriticalMutex);
  end;
end;

function TBoundedBuffer.PutItem(NewItem: Pointer): boolean;

{ Called by producer thread }
var
  NthItem: TBufferEntries;

begin
  result := false;
  { WAIT(EntriesFree) }
  if WaitForSingleObject(FEntriesFree, INFINITE) <> WAIT_OBJECT_0 then
    exit;
  if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0)
    or not FBufInit then
    exit;
  NthItem := FBuf;
  Inc(NthItem, FWritePtr);
  NthItem^ := NewItem;
  FWritePtr := (FWritePtr + 1) mod FBufSize;
  ReleaseMutex(FCriticalMutex);
  { SIGNAL(EntriesUsed) }
  ReleaseSemaphore(FEntriesUsed, 1, nil);
  result := true;
end;

function TBoundedBuffer.GetItem: Pointer;

{ Called by consumer thread }
var
  NthItem: TBufferEntries;

begin
  result := nil;
  { WAIT(EntriesUsed) }
  if WaitForSingleObject(FEntriesUsed, INFINITE) <> WAIT_OBJECT_0 then
    exit;
  if (WaitForSingleObject(FCriticalMutex, DefaultWaitTime) <> WAIT_OBJECT_0)
    or not FBufInit then
    exit;
  NthItem := FBuf;
  Inc(NthItem, FReadPtr);
  Result := NthItem^;
  FReadPtr := (FReadPtr + 1) mod FBufSize;
  ReleaseMutex(FCriticalMutex);
  { SIGNAL(EntriesFree) }
  ReleaseSemaphore(FEntriesFree, 1, nil);
end;

destructor TBoundedBuffer.Destroy;
begin
  ResetState;
  inherited Destroy;
end;

end.