|
@@ -15,19 +15,33 @@ uses
|
|
|
{$EndIf}
|
|
|
|
|
|
type
|
|
|
+ TPQCursor = Class;
|
|
|
+
|
|
|
+ { TPQTrans }
|
|
|
+
|
|
|
TPQTrans = Class(TSQLHandle)
|
|
|
- protected
|
|
|
+ protected
|
|
|
PGConn : PPGConn;
|
|
|
+ FList : TThreadList;
|
|
|
+ Procedure RegisterCursor(S : TPQCursor);
|
|
|
+ Procedure UnRegisterCursor(S : TPQCursor);
|
|
|
+ Public
|
|
|
+ Constructor Create;
|
|
|
+ Destructor Destroy; override;
|
|
|
end;
|
|
|
|
|
|
+ { TPQCursor }
|
|
|
+
|
|
|
TPQCursor = Class(TSQLCursor)
|
|
|
- protected
|
|
|
+ protected
|
|
|
Statement : string;
|
|
|
StmtName : string;
|
|
|
tr : TPQTrans;
|
|
|
res : PPGresult;
|
|
|
CurTuple : integer;
|
|
|
FieldBinding : array of integer;
|
|
|
+ Public
|
|
|
+ Destructor Destroy; override;
|
|
|
end;
|
|
|
|
|
|
EPQDatabaseError = class(EDatabaseError)
|
|
@@ -50,8 +64,8 @@ type
|
|
|
|
|
|
TPQConnection = class (TSQLConnection)
|
|
|
private
|
|
|
- FConnectionPool : array of TPQTranConnection;
|
|
|
- FCursorCount : word;
|
|
|
+ FConnectionPool : TThreadList;
|
|
|
+ FCursorCount : dword;
|
|
|
FConnectString : string;
|
|
|
FIntegerDateTimes : boolean;
|
|
|
FVerboseErrors : Boolean;
|
|
@@ -60,6 +74,11 @@ type
|
|
|
function TranslateFldType(res : PPGresult; Tuple : integer; out Size : integer) : TFieldType;
|
|
|
procedure ExecuteDirectPG(const Query : String);
|
|
|
protected
|
|
|
+ // Add connection to pool.
|
|
|
+ procedure AddConnection(T: TPQTranConnection);
|
|
|
+ // Release connection in pool.
|
|
|
+ procedure ReleaseConnection(Conn: PPGConn; DoClear : Boolean);
|
|
|
+
|
|
|
procedure DoInternalConnect; override;
|
|
|
procedure DoInternalDisconnect; override;
|
|
|
function GetHandle : pointer; override;
|
|
@@ -86,6 +105,7 @@ type
|
|
|
function RowsAffected(cursor: TSQLCursor): TRowsCount; override;
|
|
|
public
|
|
|
constructor Create(AOwner : TComponent); override;
|
|
|
+ destructor destroy; override;
|
|
|
function GetConnectionInfo(InfoType:TConnInfoType): string; override;
|
|
|
procedure CreateDB; override;
|
|
|
procedure DropDB; override;
|
|
@@ -152,6 +172,53 @@ const Oid_Bool = 16;
|
|
|
oid_numeric = 1700;
|
|
|
Oid_uuid = 2950;
|
|
|
|
|
|
+{ TPQTrans }
|
|
|
+
|
|
|
+procedure TPQTrans.RegisterCursor(S: TPQCursor);
|
|
|
+begin
|
|
|
+ FList.Add(S);
|
|
|
+ S.tr:=Self;
|
|
|
+end;
|
|
|
+
|
|
|
+procedure TPQTrans.UnRegisterCursor(S: TPQCursor);
|
|
|
+begin
|
|
|
+ S.tr:=Nil;
|
|
|
+ FList.Remove(S);
|
|
|
+end;
|
|
|
+
|
|
|
+constructor TPQTrans.Create;
|
|
|
+begin
|
|
|
+ Flist:=TThreadList.Create;
|
|
|
+ FList.Duplicates:=dupIgnore;
|
|
|
+end;
|
|
|
+
|
|
|
+destructor TPQTrans.Destroy;
|
|
|
+
|
|
|
+Var
|
|
|
+ L : TList;
|
|
|
+ I : integer;
|
|
|
+
|
|
|
+begin
|
|
|
+ L:=Flist.LockList;
|
|
|
+ try
|
|
|
+ For I:=0 to L.Count-1 do
|
|
|
+ TPQCursor(L[i]).tr:=Nil;
|
|
|
+ finally
|
|
|
+ Flist.UnlockList;
|
|
|
+ end;
|
|
|
+ FreeAndNil(FList);
|
|
|
+ inherited Destroy;
|
|
|
+end;
|
|
|
+
|
|
|
+{ TPQCursor }
|
|
|
+
|
|
|
+destructor TPQCursor.Destroy;
|
|
|
+begin
|
|
|
+ if Assigned(tr) then
|
|
|
+ Tr.UnRegisterCursor(Self);
|
|
|
+ inherited Destroy;
|
|
|
+end;
|
|
|
+
|
|
|
|
|
|
constructor TPQConnection.Create(AOwner : TComponent);
|
|
|
|
|
@@ -160,6 +227,15 @@ begin
|
|
|
FConnOptions := FConnOptions + [sqSupportParams] + [sqEscapeRepeat] + [sqEscapeSlash];
|
|
|
FieldNameQuoteChars:=DoubleQuotes;
|
|
|
VerboseErrors:=True;
|
|
|
+ FConnectionPool:=TThreadlist.Create;
|
|
|
+end;
|
|
|
+
|
|
|
+destructor TPQConnection.destroy;
|
|
|
+begin
|
|
|
+ // We must disconnect here. If it is done in inherited, then connection pool is gone.
|
|
|
+ Connected:=False;
|
|
|
+ FreeAndNil(FConnectionPool);
|
|
|
+ inherited destroy;
|
|
|
end;
|
|
|
|
|
|
procedure TPQConnection.CreateDB;
|
|
@@ -174,7 +250,7 @@ begin
|
|
|
ExecuteDirectPG('DROP DATABASE ' +DatabaseName);
|
|
|
end;
|
|
|
|
|
|
-procedure TPQConnection.ExecuteDirectPG(const query : string);
|
|
|
+procedure TPQConnection.ExecuteDirectPG(const Query: String);
|
|
|
|
|
|
var ASQLDatabaseHandle : PPGConn;
|
|
|
res : PPGresult;
|
|
@@ -207,6 +283,39 @@ begin
|
|
|
{$EndIf}
|
|
|
end;
|
|
|
|
|
|
+procedure TPQConnection.AddConnection(T: TPQTranConnection);
|
|
|
+
|
|
|
+begin
|
|
|
+ FConnectionPool.Add(T);
|
|
|
+end;
|
|
|
+
|
|
|
+procedure TPQConnection.ReleaseConnection(Conn: PPGConn; DoClear: Boolean);
|
|
|
+
|
|
|
+Var
|
|
|
+ I : Integer;
|
|
|
+ L : TList;
|
|
|
+ T : TPQTranConnection;
|
|
|
+
|
|
|
+begin
|
|
|
+ L:=FConnectionPool.LockList;
|
|
|
+ // make connection available in pool
|
|
|
+ try
|
|
|
+ for i:=0 to L.Count-1 do
|
|
|
+ begin
|
|
|
+ T:=TPQTranConnection(L[i]);
|
|
|
+ if (T.FPGConn=Conn) then
|
|
|
+ begin
|
|
|
+ T.FTranActive:=false;
|
|
|
+ if DoClear then
|
|
|
+ T.FPGConn:=Nil;
|
|
|
+ break;
|
|
|
+ end;
|
|
|
+ end
|
|
|
+ finally
|
|
|
+ FConnectionPool.UnlockList;
|
|
|
+ end;
|
|
|
+end;
|
|
|
+
|
|
|
|
|
|
function TPQConnection.GetTransactionHandle(trans : TSQLHandle): pointer;
|
|
|
begin
|
|
@@ -218,23 +327,26 @@ var
|
|
|
res : PPGresult;
|
|
|
tr : TPQTrans;
|
|
|
i : Integer;
|
|
|
+ L : TList;
|
|
|
+
|
|
|
begin
|
|
|
result := false;
|
|
|
-
|
|
|
tr := trans as TPQTrans;
|
|
|
-
|
|
|
+ L:=tr.FList.LockList;
|
|
|
+ try
|
|
|
+ For I:=0 to L.Count-1 do
|
|
|
+ begin
|
|
|
+ UnprepareStatement(TPQCursor(L[i]));
|
|
|
+ TPQCursor(L[i]).tr:=Nil;
|
|
|
+ end;
|
|
|
+ L.Clear;
|
|
|
+ finally
|
|
|
+ tr.flist.UnlockList;
|
|
|
+ end;
|
|
|
res := PQexec(tr.PGConn, 'ROLLBACK');
|
|
|
-
|
|
|
CheckResultError(res,tr.PGConn,SErrRollbackFailed);
|
|
|
-
|
|
|
PQclear(res);
|
|
|
- //make connection available in pool
|
|
|
- for i:=0 to length(FConnectionPool)-1 do
|
|
|
- if FConnectionPool[i].FPGConn=tr.PGConn then
|
|
|
- begin
|
|
|
- FConnectionPool[i].FTranActive:=false;
|
|
|
- break;
|
|
|
- end;
|
|
|
+ ReleaseConnection(tr.PGCOnn,false);
|
|
|
result := true;
|
|
|
end;
|
|
|
|
|
@@ -245,20 +357,12 @@ var
|
|
|
i : Integer;
|
|
|
begin
|
|
|
result := false;
|
|
|
-
|
|
|
tr := trans as TPQTrans;
|
|
|
-
|
|
|
res := PQexec(tr.PGConn, 'COMMIT');
|
|
|
CheckResultError(res,tr.PGConn,SErrCommitFailed);
|
|
|
-
|
|
|
PQclear(res);
|
|
|
//make connection available in pool
|
|
|
- for i:=0 to length(FConnectionPool)-1 do
|
|
|
- if FConnectionPool[i].FPGConn=tr.PGConn then
|
|
|
- begin
|
|
|
- FConnectionPool[i].FTranActive:=false;
|
|
|
- break;
|
|
|
- end;
|
|
|
+ ReleaseConnection(tr.PGConn,false);
|
|
|
result := true;
|
|
|
end;
|
|
|
|
|
@@ -267,35 +371,47 @@ var
|
|
|
res : PPGresult;
|
|
|
tr : TPQTrans;
|
|
|
i : Integer;
|
|
|
+ t : TPQTranConnection;
|
|
|
+ L : TList;
|
|
|
begin
|
|
|
result:=false;
|
|
|
tr := trans as TPQTrans;
|
|
|
|
|
|
//find an unused connection in the pool
|
|
|
i:=0;
|
|
|
- while i<length(FConnectionPool) do
|
|
|
- if (FConnectionPool[i].FPGConn=nil) or not FConnectionPool[i].FTranActive then
|
|
|
- break
|
|
|
- else
|
|
|
+ t:=Nil;
|
|
|
+ L:=FConnectionPool.LockList;
|
|
|
+ try
|
|
|
+ while (I<L.Count-1) do
|
|
|
+ begin
|
|
|
+ T:=TPQTranConnection(L[i]);
|
|
|
+ if (T.FPGConn=nil) or not T.FTranActive then
|
|
|
+ break
|
|
|
+ else
|
|
|
+ T:=Nil;
|
|
|
i:=i+1;
|
|
|
- if i=length(FConnectionPool) then //create a new connection
|
|
|
+ end;
|
|
|
+ // set to active now, so when we exit critical section,
|
|
|
+ // it will be marked active and will not be found.
|
|
|
+ if Assigned(T) then
|
|
|
+ T.FTranActive:=true;
|
|
|
+ finally
|
|
|
+ FConnectionPool.UnLockList;
|
|
|
+ end;
|
|
|
+ if (T=Nil) then
|
|
|
+ begin
|
|
|
+ T:=TPQTranConnection.Create;
|
|
|
+ T.FTranActive:=True;
|
|
|
+ AddConnection(T);
|
|
|
+ end;
|
|
|
+ if (T.FPGConn<>nil) then
|
|
|
+ tr.PGConn:=T.FPGConn
|
|
|
+ else
|
|
|
begin
|
|
|
tr.PGConn := PQconnectdb(pchar(FConnectString));
|
|
|
CheckConnectionStatus(tr.PGConn);
|
|
|
-
|
|
|
if CharSet <> '' then
|
|
|
PQsetClientEncoding(tr.PGConn, pchar(CharSet));
|
|
|
-
|
|
|
- //store the new connection
|
|
|
- SetLength(FConnectionPool,i+1);
|
|
|
- FConnectionPool[i]:=TPQTranConnection.Create;
|
|
|
- FConnectionPool[i].FPGConn:=tr.PGConn;
|
|
|
- FConnectionPool[i].FTranActive:=true;
|
|
|
- end
|
|
|
- else //re-use existing connection
|
|
|
- begin
|
|
|
- tr.PGConn:=FConnectionPool[i].FPGConn;
|
|
|
- FConnectionPool[i].FTranActive:=true;
|
|
|
end;
|
|
|
|
|
|
res := PQexec(tr.PGConn, 'BEGIN');
|
|
@@ -339,7 +455,10 @@ end;
|
|
|
|
|
|
|
|
|
procedure TPQConnection.DoInternalConnect;
|
|
|
-var ASQLDatabaseHandle : PPGConn;
|
|
|
+var
|
|
|
+ ASQLDatabaseHandle : PPGConn;
|
|
|
+ T : TPQTranConnection;
|
|
|
+
|
|
|
begin
|
|
|
{$IfDef LinkDynamically}
|
|
|
InitialisePostgres3;
|
|
@@ -365,24 +484,33 @@ begin
|
|
|
// This only works for pg>=8.0, so timestamps won't work with earlier versions of pg which are compiled with integer_datetimes on
|
|
|
if PQparameterStatus<>nil then
|
|
|
FIntegerDateTimes := PQparameterStatus(ASQLDatabaseHandle,'integer_datetimes') = 'on';
|
|
|
-
|
|
|
- SetLength(FConnectionPool,1);
|
|
|
- FConnectionPool[0]:=TPQTranConnection.Create;
|
|
|
- FConnectionPool[0].FPGConn:=ASQLDatabaseHandle;
|
|
|
- FConnectionPool[0].FTranActive:=false;
|
|
|
+ T:=TPQTranConnection.Create;
|
|
|
+ T.FPGConn:=ASQLDatabaseHandle;
|
|
|
+ T.FTranActive:=false;
|
|
|
+ AddConnection(T);
|
|
|
end;
|
|
|
|
|
|
procedure TPQConnection.DoInternalDisconnect;
|
|
|
-var i:integer;
|
|
|
+var
|
|
|
+ i:integer;
|
|
|
+ L : TList;
|
|
|
+ T : TPQTranConnection;
|
|
|
+
|
|
|
begin
|
|
|
Inherited;
|
|
|
- for i:=0 to length(FConnectionPool)-1 do
|
|
|
- begin
|
|
|
- if assigned(FConnectionPool[i].FPGConn) then
|
|
|
- PQfinish(FConnectionPool[i].FPGConn);
|
|
|
- FConnectionPool[i].Free;
|
|
|
- end;
|
|
|
- Setlength(FConnectionPool,0);
|
|
|
+ L:=FConnectionPool.LockList;
|
|
|
+ try
|
|
|
+ for i:=0 to L.Count-1 do
|
|
|
+ begin
|
|
|
+ T:=TPQTranConnection(L[i]);
|
|
|
+ if assigned(T.FPGConn) then
|
|
|
+ PQfinish(T.FPGConn);
|
|
|
+ T.Free;
|
|
|
+ end;
|
|
|
+ L.Clear;
|
|
|
+ finally
|
|
|
+ FConnectionPool.UnLockList;
|
|
|
+ end;
|
|
|
{$IfDef LinkDynamically}
|
|
|
ReleasePostgres3;
|
|
|
{$EndIf}
|
|
@@ -396,13 +524,7 @@ begin
|
|
|
begin
|
|
|
sErr := PQerrorMessage(conn);
|
|
|
//make connection available in pool
|
|
|
- for i:=0 to length(FConnectionPool)-1 do
|
|
|
- if FConnectionPool[i].FPGConn=conn then
|
|
|
- begin
|
|
|
- FConnectionPool[i].FPGConn:=nil;
|
|
|
- FConnectionPool[i].FTranActive:=false;
|
|
|
- break;
|
|
|
- end;
|
|
|
+ ReleaseConnection(Conn,True);
|
|
|
PQfinish(conn);
|
|
|
DatabaseError(sErrConnectionFailed + ' (PostgreSQL: ' + sErr + ')', Self);
|
|
|
end;
|
|
@@ -463,14 +585,7 @@ begin
|
|
|
if assigned(conn) then
|
|
|
begin
|
|
|
PQFinish(conn);
|
|
|
- //make connection available in pool
|
|
|
- for i:=0 to length(FConnectionPool)-1 do
|
|
|
- if FConnectionPool[i].FPGConn=conn then
|
|
|
- begin
|
|
|
- FConnectionPool[i].FPGConn:=nil;
|
|
|
- FConnectionPool[i].FTranActive:=false;
|
|
|
- break;
|
|
|
- end;
|
|
|
+ ReleaseConnection(Conn,True);
|
|
|
end;
|
|
|
raise E;
|
|
|
end;
|
|
@@ -549,18 +664,18 @@ begin
|
|
|
end;
|
|
|
end;
|
|
|
|
|
|
-Function TPQConnection.AllocateCursorHandle : TSQLCursor;
|
|
|
+function TPQConnection.AllocateCursorHandle: TSQLCursor;
|
|
|
|
|
|
begin
|
|
|
result := TPQCursor.create;
|
|
|
end;
|
|
|
|
|
|
-Procedure TPQConnection.DeAllocateCursorHandle(var cursor : TSQLCursor);
|
|
|
+procedure TPQConnection.DeAllocateCursorHandle(var cursor: TSQLCursor);
|
|
|
begin
|
|
|
FreeAndNil(cursor);
|
|
|
end;
|
|
|
|
|
|
-Function TPQConnection.AllocateTransactionHandle : TSQLHandle;
|
|
|
+function TPQConnection.AllocateTransactionHandle: TSQLHandle;
|
|
|
|
|
|
begin
|
|
|
result := TPQTrans.create;
|
|
@@ -625,8 +740,9 @@ begin
|
|
|
if FStatementType in [stInsert,stUpdate,stDelete, stSelect] then
|
|
|
begin
|
|
|
StmtName := 'prepst'+inttostr(FCursorCount);
|
|
|
- inc(FCursorCount);
|
|
|
- tr := TPQTrans(aTransaction.Handle);
|
|
|
+ InterlockedIncrement(FCursorCount);
|
|
|
+ TPQTrans(aTransaction.Handle).RegisterCursor(Cursor as TPQCursor);
|
|
|
+
|
|
|
// Only available for pq 8.0, so don't use it...
|
|
|
// Res := pqprepare(tr,'prepst'+name+nr,pchar(buf),params.Count,pchar(''));
|
|
|
s := 'prepare '+StmtName+' ';
|
|
@@ -755,7 +871,8 @@ begin
|
|
|
end
|
|
|
else
|
|
|
begin
|
|
|
- tr := TPQTrans(aTransaction.Handle);
|
|
|
+ // Registercursor sets tr
|
|
|
+ TPQTrans(aTransaction.Handle).RegisterCursor(Cursor as TPQCursor);
|
|
|
|
|
|
if Assigned(AParams) and (AParams.Count > 0) then
|
|
|
begin
|
|
@@ -816,26 +933,39 @@ end;
|
|
|
function TPQConnection.GetHandle: pointer;
|
|
|
var
|
|
|
i:integer;
|
|
|
+ L : TList;
|
|
|
+ T : TPQTranConnection;
|
|
|
+
|
|
|
begin
|
|
|
result:=nil;
|
|
|
if not Connected then
|
|
|
exit;
|
|
|
//Get any handle that is (still) connected
|
|
|
- for i:=0 to length(FConnectionPool)-1 do
|
|
|
- if assigned(FConnectionPool[i].FPGConn) and (PQstatus(FConnectionPool[i].FPGConn)<>CONNECTION_BAD) then
|
|
|
+ L:=FConnectionPool.LockList;
|
|
|
+ try
|
|
|
+ I:=L.Count-1;
|
|
|
+ While (I>=0) and (Result=Nil) do
|
|
|
begin
|
|
|
- Result :=FConnectionPool[i].FPGConn;
|
|
|
- exit;
|
|
|
+ T:=TPQTranConnection(L[i]);
|
|
|
+ if assigned(T.FPGConn) and (PQstatus(T.FPGConn)<>CONNECTION_BAD) then
|
|
|
+ Result:=T.FPGConn;
|
|
|
+ Dec(I);
|
|
|
end;
|
|
|
+ finally
|
|
|
+ FConnectionPool.UnLockList;
|
|
|
+ end;
|
|
|
+ if Result<>Nil then
|
|
|
+ exit;
|
|
|
//Nothing connected!! Reconnect
|
|
|
- if assigned(FConnectionPool[0].FPGConn) then
|
|
|
- PQreset(FConnectionPool[0].FPGConn)
|
|
|
+ // T is element 0 after loop
|
|
|
+ if assigned(T.FPGConn) then
|
|
|
+ PQreset(T.FPGConn)
|
|
|
else
|
|
|
- FConnectionPool[0].FPGConn := PQconnectdb(pchar(FConnectString));
|
|
|
- CheckConnectionStatus(FConnectionPool[0].FPGConn);
|
|
|
+ T.FPGConn := PQconnectdb(pchar(FConnectString));
|
|
|
+ CheckConnectionStatus(T.FPGConn);
|
|
|
if CharSet <> '' then
|
|
|
- PQsetClientEncoding(FConnectionPool[0].FPGConn, pchar(CharSet));
|
|
|
- result:=FConnectionPool[0].FPGConn;
|
|
|
+ PQsetClientEncoding(T.FPGConn, pchar(CharSet));
|
|
|
+ result:=T.FPGConn;
|
|
|
end;
|
|
|
|
|
|
function TPQConnection.Fetch(cursor : TSQLCursor) : boolean;
|