Sunday, 3 November 2013

CONCURRENCY CONTROL IN DISTRIBUTED DATABASE SYSTEMS


1. DESCRIPTION OF THE PROBLEM
Today's Database Management Systems (DBMSs) work in multiuser environment where users access the database concurrently. Therefore the DBMSs control the concurrent execution of user transactions, so that the overall correction of the database is maintained. A transaction is a user program accessing the database. Research in database concurrency control has advanced in a different direction from areas that may appear related such as operating systems concurrency.
Database concurrency control permits users to access a database in a multiprogrammed fashion while preserving the illusion that each user is executing alone on a dedicated system. The main difficulty in achieving this goal is to prevent database updates performed by one user from interfering with database retrievals and updates performed by another.
As an example, consider an on-line airline reservation system. Suppose two customers Customer A and Customer B, simultaneously try to reserve a seat for the same flight. In the absence of concurrency control, these two activities could interfere as illustrated in Figure 1. Let Seat No 18 be the first available seat. Both transactions could read the reservation information approximately same time and they reserve the seat No 18 for Customer A and Customer B, and store the result back into the database. The net effect is incorrect: Although two customers reserved a seat, the database reflects only one activity, the other reservation is lost by the system.
As it is apparent from the example, it is necessary to establish a correctness criterion for the execution of concurrent user transactions. Serializability is the correctness criterion for the execution of concurrent transactions. The execution of concurrent transactions, which is termed as a history or as a log, is serializable if it produces the same output and has the same effect on the database as some serial execution of the same transactions. A log is serial if, for every pair of transactions, all of the operations of one transaction execute before any of the operations of the other. However, deciding on whether an equivalent serial log exists, is an NP_complete problem, that is, there is no known algorithm which will decide in polynomial time on whether any given log is serializable.
Since serializability problem is NP_complete, several subclasses of serializable logs having polynomial time membership test are introduced. The popular subclasses are: the class of serial logs (class S), the class of logs produced by Two_phase locking schedulers (class 2PL), the class of logs produced by Basic Timestamp Ordering schedulers (class BTO) and the class of Conflict Preserving Serializable logs (class CPSR), whose scheduler is based on Serialization Graph Testing (SGT).


Figure 1. Example of an anomaly in database in the absence of concurrency control.

Concurrency control in DBMSs is achieved by a program, called the scheduler, whose goal is to order the operations of transactions in such a way that the resulting log is serializable. Practically, 2PL is the most popular scheduling technique for the centralized DBMSs. However for distributed DBMSs, 2PL induces high communication cost because of the deadlock problem. Therefore, improved algorithms for concurrency control in distributed DBMSs is one of the active research areas in database theory.
Theoretically the class CPSR had been the most attractive log class until 1987, because CPSR was the largest known class of serializable logs in P. However in 1987 a new class of serializable logs in P, called the class WRW is introduced and it is proved that the class WRW is a proper superset of the class CPSR. Almost at the same time the class HD is introduced and it is proved that HD is a proper superset of the class WRW, which makes the HD class the largest known serializable log class in P.
2. DATABASE SYSTEM MODEL
A database is a structured collection of data items, denoted {...,x,y,z} that can be accessed concurrently by several transactions. The size of the data contained in a data item is called the granularity of data item. The granularity is not important for the scope of this study and practically it could be chosen as a file, a record of a file, a field of a record or a page of a disk. The values of the data items at any time comprise the state of the database. A Database System (DBS) is a collection of software and hardware modules that supports the operations performed by the transactions. Users interact with the DBS through transactions. A transaction interacts with the outside world by issuing only read and write operations to the DBS or by doing terminal I/O.
Users access data items by issuing Read and Write operations. A transaction, denoted by Ti, is a set of operations on data items that transforms a database from one consistent state to another. That is, transactions are assumed to be complete and correct computation units and if each transaction is executed alone on an initially consistent database, would terminate, produce correct results and leave database in a consistent state.
A DBS simply consists of the following modules: Transaction manager (TM), Data Manager (DM) and Scheduler (Figure 2). A Distributed Database System (DDBS) is a collection of sites connected by a communication network and each site is simply a DBS. However in DDBSs each site runs one or more of the following software modules: a TM, a DM or a Scheduler. In DDBSs the schedulers may be distributed, that is there may be a local scheduler at each site. However the local schedulers must cooperate for the consistency of the database. The distributed schedulers must behave as if there is a global scheduler in the system (Figure 3.). TM performs the preprocessing of the operations it receives from transactions and DM manages the actual database while the scheduler controls the relative order in which these operations are executed.
Figure 2. Database System Model
Figure 3. Distributed Database System Model
Each transaction issues its operations to a single TM, which receives the operations issued by transactions and forwards them to the scheduler. In DDBSs, the TM is also responsible for determining which scheduler should process the operation submitted by a transaction.
The scheduler is responsible for the consistency of the database. However, a scheduler does not pay attention to the computations performed by the transactions, it makes its decision solely by considering the type of the operations and the data items related to the operations. The scheduler controls the order in which DMs process the read and write operations. When a scheduler receives a read or a write operation, it can either output the operation to the related DM, or delay the operation by holding it for later action, or reject the operation. If an operation of a transaction is rejected, then the transaction should be aborted. Furthermore, every transaction that read a value written by the aborted transaction should also be aborted. This, phenomenon, where an abortion triggers other abortions is called cascading aborts and it is usually avoided by not allowing a transaction Ti to read another transaction Tj's output until Tj is committed, that is, until the DBS is certain that the transaction Tj will not be aborted. Therefore, an incomplete transaction can not reveal its results to other transactions before its commitment, which is called isolation.
Usually, a scheduler decides whether to accept, reject or delay an operation every time it receives the operation. Another approach is to schedule each operation immediately as it is received. When a transaction terminates, a validation test is applied on the transaction. If the validation test terminates successfully then the transaction is committed, otherwise it is aborted. Such schedulers are called the optimistic schedulers because they optimistically assume that transactions will not be aborted. These schedulers are also called the certifiers.
The DM executes each read and write operation it receives. For a read operation, DM looks into its local database and returns the requested value. For a write operation, the DM modifies its local database and returns an acknowledgment. The DM sends the returned value or acknowledgment to the scheduler, which relays it back to the TM, which relays it back to the transaction.
The read and write operations performed by transactions on some data item x are denoted by Ri[x] and Wi[x] respectively. The read operation Ri[x] returns the value stored in data item x to transaction Ti and the write operation Wi[x] changes the value of data item x to the one computed by Ti. The read operations has no effect on the consistency of the database. However, since the write operations update the values of the data items, they cause a change in the state of the database. Two operations belonging to different transactions conflict if they operate on the same data item and one of them is a write. The read set, denoted by S(Ri), of a transaction is the set of data items a transaction reads and the write set of a transaction, denoted S(Wi), is the set of data items a transaction writes. The access set or the base set of a transaction is the union of its read and its write sets. When two or more transactions execute concurrently, their operations are executed in an interleaved fashion.
Each transaction starts with a begin operation and ends with a commit or abort operation. Commit indicates that the transaction has completed its execution and the effects of the transaction on the database, that is every write operation processed on behalf of the transaction, should be made permanent. Abort indicates that the transaction has completed abnormally and its effects should be undone by restoring the old values of the data item.
The most common commitment protocol is two phase commitment (2PC). In the fist phase of 2PC, the values of data items in the write set of the transaction are copied into the secure storage at the related sites without overwriting the old values. If the first phase terminates successfully, the transaction commits and it cannot be aborted from this point on. Then the commit message is sent to the related sites and the effects of the transaction are made permanent by writing the values from the secure storage into the actual database. If a failure occurs during the first phase of 2PC, the transaction is aborted by simply omitting the values copied into the secure storage. If the transaction fail during the second phase of 2PC, there is no need for abortion, however the values copied into the secure storage are written into the actual database when the failed site awakes By the use of 2PC, cascading aborts are also avoided, because the write operations are applied into the database only when the transactions commit.
Let T={T0, T1, ..Tn} be a set of transactions. A Log (or History) is an execution where the operations of the transactions are interleaved such that the order of the operations within the transactions are preserved.
For example, for T={T1,T2} let
T1= B1 R1[x] R1[y] W1[x] E1
T2= B2 R2[x] W2[x] E2,
Here the operations Bi and Ei i=1,2 are used to denote the beginning and the end of the transactions. Some possible logs over T may be
L1= B1 R1[x] R1[y] B2 R2[x] W1[x] E1 W2[x] E2 or
L2= B2 R2[x] B1 R1[x] R1[y] W1[x] E1 W2[x] E2.

However the execution
B2 R2[x] B1 R1[y] R1[x] W1[x] E1 W2[x] E2
is not a log over T since the order of the operations R11=R1[x] and R12=R1[y] of T1 is not preserved.
Let L be a log over some set T and suppose Wi[x] and Rj[x] are operations in L. We say that Tj reads x from Ti if Wi[x]<Rj[x], (i.e. Wi precedes Rj) and no Wk[x] falls between Wi[x] and Rj[x]. In log
L3 = W0[x] R1[x] W2[x] R3[x] R4[x],
where begin and end operations are not shown for simplicity, T1 reads x from T0 and T3 and T4 reads x from T2.
We call Wi[x], a final write in L if no Wk[x] follows it. In log
L4= W0[x] R2[x] W1[y] W2[y],
W0[x] and W2[y] are final writes.
Two logs over T are equivalent if they represent the same computation, that is in both logs each transaction reads the value written by the same transaction and they have the same final writes.
To consider the initial values of the data items, logs are augmented by an initial transaction Ts, which initially writes all the data items and to consider the effect of final writes, logs are augmented by a final transaction Tf which finally reads all the data items. When the initial and final transactions are augmented log L4 becomes
L4a= Ws[x] Ws[y] W0[x] R2[x] W1[y] W2[y] Rf[x] Rf[y]
Two logs are equivalent if they have the same read from relations when they are augmented by initial and final transactions.
The log L4 is equivalent to log
L5= W0[x] W1[y] R2[x] W2[y],
because they have the same transaction set L4a and L5a have the same read_from relations. However L4 is not equivalent to log
L6= R2[x] W0[x] W2[y] W1[y],
since, T2 reads x from T0 in L4a, but from Ts in L6a and also Tf reads y from T2 in L4a, but from T1 in L6a.
A log is serializable if there exists an equivalent serial log.
The log L4 is serializable because it is equivalent to L5 which is serial. Notice that in L5 the operations of the transactions are not interleaved.
3. TWO PHASE LOCKING METHOD
The two phase locking (2PL) schedulers is the most popular type of schedulers in commercial products. In 2PL technique two type of locks, which are the read lock and the write lock, are used on the data items. The read lock is a shared type of lock whereas the write lock is an exclusive type. That is, a transaction can have a read lock on a data item only if there is no write lock on the data item by any other transaction and a transaction can have a write lock on a data item only if there is no read lock or write lock on the data item by any other transaction.
In a database system having 2PL mechanism for the concurrency control, each transaction obeys the following rules:
    1. a transaction does not request a read or write lock on a data item if it already has that type of lock on that data item;
    2. a transaction must have a read lock on a data item before reading the data item and it must have a write lock on a data item before writing the data item;
    3. a transaction does not request any lock after it has released a lock.
Each transaction obeying the third rule has two phases, this is the reason why the technique is called two phase locking. During the first phase, which is called the growing phase, a transaction obtains its locks without releasing any lock. The point at the end of the growing phase, when a transaction owns all the locks it will ever own, is called the locked point of the transaction. During the second phase, which is called the shrinking phase, the transaction releases the locks it has obtained in the first phase. It should be noted that a transaction can not request a lock in the shrinking phase. When the transaction terminates all the locks obtained by the transaction are released. The locked points of the transactions in a log L determines the serialization order of the transactions.
Two phase locking is sufficient to preserve serializability. However 2PL is not sufficient to preserve isolation. If a transaction Ti releases some of its write locks before its commitment, then some other transaction Tj may read this value. In the case Ti is aborted, Tj and all the other transactions which have read some data item from Ti should also be aborted. In order to guarantee isolation, transactions are required to hold all of their locks until their commitment at the termination.
The log
L7= Ba Ra[x] Bb Rb[y] Wa[x] Ea Wb[x] Wb[y] Eb
is in the class 2PL, all the operations are accepted by the 2PL scheduler without any change on the order. On the other hand, the log
L8= Ba Ra[x] Bb Rb[x] Wa[x] Wa[y] Ea Wb[y] Eb
is not in the class 2PL, since Ta can not obtain the write lock on x before Tb have released it, which is released at Eb operation. 2PL scheduler do not accept Wa[x] request, but blocks it, therefore the output of the 2PL scheduler for L8 will be
L8_2PL= Ba Ra[x] Bb Rb[x] Wb[y] Eb Wa[x] Wa[y] Ea
The deadlock is the most important problem to be handled when a locking scheme is used for concurrency control. A scheduler must be able to discover whether there is a deadlock in the system. In the case of deadlock, one of the transactions in the deadlock state should be chosen as a victim and should be aborted. Deadlocks can be detected by using wait_for graphs or timeouts.
The nodes of the wait_for graph are the transactions and there is an edge from Ti to Tj if and only if the transaction Ti is waiting for a lock, which is held by the transaction Tj. The wait_for graph is cyclic if there is a deadlock in the system, otherwise it is acyclic. In distributed database systems, there may be a system wide deadlock although there is no deadlock when each site in the system is considered separately. Therefore a global deadlock detection mechanism, which requires execution of more complex algorithms, is necessary for the distributed database systems. For global deadlock detection, either each site periodically sends their local wait_for graph to the site having a global deadlock detector, where a global wait_for graph is constructed by combining the local wait_for graphs, or the information about the paths in the local wait_for graph is sent to the related sited as necessary. Global deadlock detection requires the transmission of many messages among the sites, which is an important portion of communication cost. Furthermore in some situations, although there is no deadlock in the system, the system may decide that there is a deadlock. This phenomenon is called the phantom deadlock and the reason is the delay between the occurrence of the events and the processing time of the messages.
Starvation or livelock is another problem which arises in the case when a transaction is selected as victim repeatedly. Such a transaction may not have the opportunity to finish.
Most of the systems use timeouts for deadlock detection instead of the wait_for graph because of the simplicity of the timeout technique. With the timeout technique, if a transaction waits for a lock to be released longer than a predetermined timeout period, then it is assumed that there is a deadlock in the system and the transaction waiting longer than the timeout period is aborted. In fact, with timeout technique, the existence of a deadlock is only guessed, it is not an actual deadlock detection mechanism. Therefore, the length of the timeout period should be tuned well. If the timeout period is too short, then most of the time, it is decided that there is a deadlock in the system although actually there is not, which results in unnecessary abortions. If the timeout period is chosen too long, transactions stay in deadlock state for a long time until it is decided that there is a deadlock, which results in decrease in the system performance. The starvation problem still remains with the timeout scheme.
Although it is not widely used in distributed systems, deadlock prevention by assigning priorities to the transactions is another solution to the deadlock problem. The priorities of transactions may be decided by considering their submission time to the system. The transaction submitted earlier has greater priority. Assume a transaction Ti waits for a lock which is held by transaction Tj. Wait_die is a strategy which allows Ti to wait only if the priority of Ti is greater than Tj, otherwise Ti is aborted. Another strategy is wound_wait, which allows Ti to wait only if the priority of Ti is less than the priority of Tj, otherwise Tj is aborted. If the priority of an aborted transaction is not updated when it is restarted, the transaction becomes older as time passes, and its priority increases. Therefore the starvation problem is solved by the wait_die and the wound_wait strategies, because these strategies never abort the transactions having higher priorities. As in the time out technique, transactions may be aborted although there is no deadlock.
Deadlock prevention by preordering of the data items is not applicable to 2PL, because in general it is assumed that there is no a priori knowledge about the data items in the read set and write sets of the transactions. The read set and the write set of a transaction are revealed as the transaction is executed. Also the deadlock avoidance techniques are not applicable with the same reasoning.
4. TIME STAMP ORDERING METHODS
The time stamp ordering (TO) technique is based on the idea that an operation is allowed to proceed only if all the conflicting operations of older transactions have already been processed. Thus the serializability of the transactions is preserved. This requires knowledge about which transactions are younger than the others.
In the implementation of a distributed timing system, each site in the distributed system contains a local clock or a counter which is used as a clock. The clock is assumed to tick at least once between any two events. Therefore the events within the site are totally ordered. For total ordering of the events at different sites, each site is assigned a unique number and this number is concatenated as least significant bits to the current value of the local clock. Furthermore, each message contains the information about the local time of their site of origin at which the message is sent. Upon receiving a message, the local clock at the receiver site is advanced to be later than the time informed by the message. (Lamport Clock)
In a distributed system having such a clock facility, a timestamp, denoted by TS(A), is assigned to an event A, which is the local time at which the event occurs concatenated with the site identifier. TS(A) uniquely identifies the event itself and for any two events A and B, if event A has happened before the event B, then TS(A) < TS(B).
In timestamp ordering technique, each transaction Ti is assigned a unique timestamp TS(Ti). Each operation issued by the transaction Ti is also assigned the same timestamp TS(Ti) and the conflicting operations are executed in the order of their timestamps. That is the transactions obey the rule known as the TO rule, which states that if pi[x] and qj[x] are conflicting operations belonging to different transactions Ti and Tj, then pi is to be executed before qj iff TS(Ti) < TS(Tj).
Therefore, the transactions are processed such that their execution is equivalent to the execution of a serial log, where the transactions are ordered in the order of their timestamps.
There are several concurrency control methods based on timestamp ordering technique. In the basic timestamp ordering (BTO) method, for each data item x, the largest timestamp of any read operation on data item x and the largest timestamp of any write operation on data item x, which are denoted by R_TS(x) and W_TS(x) respectively, are stored along with the data item itself. The BTO scheduler processes a Ri[x] operation if W_TS(x)£ TS(Ti), otherwise Ri[x] is rejected and Ti is aborted. If Ri[x] is not rejected, then it is processed by reading the value of the data item and by setting R_TS(x) to MAX(R_TS(x),TS(Ti)). Similarly BTO scheduler process a Wi[x] operation if R_TS(x) £ TS(Ti) and W_TS(x) £ TS(Ti), otherwise Wi[x] operation is rejected and Ti is aborted. Any Wi[x] operation is processed by updating the value of the data item x and by setting W_TS(x) to MAX(W_TS(x),TS(Ti)). Aborted transactions are restarted by assigning new timestamps.
The log
L9=B1 R1[x] B2 R2[y] W2[y] E2 R1[y] W1[x] E1
is not allowed by the BTO scheduler, since B1<B2 implies that TS1<TS2 and BTO scheduler aborts T1 although the log is serializable. However the log
L10= B1 R1[x] B2 R2[y] R1[y] W2[y] E2 W1[x] E1
can be output of the BTO scheduler without any change on the order of the operations.
It may be assumed that transactions are assigned timestamps when their first operation appears in the execution, that is, the timestamps preserve the order of the first read operations of the transactions,
BTO method can further be improved by ignoring the obsolete writes. Let operations Wi[x] and Wm[x] belong to transactions Ti and Tm respectively such that TS(Ti)<TS(Tm). If the scheduler receives the operation Wi[x] after Wm[x] is already processed, then the late operation Wi[x] will be rejected by BTO scheduler, because W_TS(x) would be at least TS(Tm) which is greater than TS(Ti). However in the case R_TS(x) £ TS(Ti), this rejection is unnecessary. If Wi[x] had come before Wm[x], no other transaction by then have read the value written by Wi[x], which is obvious by the fact R_TS(x)TS(Ti), and since Wm[x] then would have overwritten the value written by Wi[x], no other transaction could have the chance to read the value written by Wi[x]. Therefore the operation Wi[x] is obsolete. The result of considering Wi[x] before Wm[x] should be the same as considering Wm[x] only and ignoring Wi[x]. Therefore in such situations Wi[x] may simply be ignored instead of rejecting it, which is known as Thomas' write rule (TWR). Therefore any Wi[x] operation can be scheduled by the use of TWR as follows: If TS(Ti)<R_TS(x) then Wi[x] is rejected, otherwise if W_TS(x)<TS(Ti) then Wi[x] is processed else it is ignored.

In log
L11_BTO = B1 R1[x] B2 R2[y] W2[x] E2 (W1[x]) E1,
the operation W1[x] is ignored by Thomas' Write Rule, and assumed as it is executed before W2[x] operation.
The basic timestamp ordering technique is easy to distribute and the transactions to be aborted are immediately recognized when the operations are being scheduled. There is no deadlock problem, because the locks are not used and operations are not blocked. However to provide for the reliability, an atomic commitment mechanism is necessary. Extra space is wasted for the read and write timestamps of the data items. Furthermore many transactions are aborted unnecessarily because of a priori selection of timestamps.
5. SERIALIZATION GRAPH TESTING METHOD (SGT OR CPSR)
The serialization graph for a log L, denoted by SG(L), is a directed graph, where the nodes are the transactions in the log. In SG(L) there is an edge from node Ti to Tj if and only if pi[x] and qj[x] are conflicting operations belonging to transactions Ti and Tj respectively and the operation pi[x] precedes qj[x] in the log.
A serialization graph testing (SGT) or conflict preserving serializability (CPSR) scheduler works by explicitly building a serialization graph. When SGT scheduler receives an operation pi[x], it adds the node Ti into the graph if Ti does not already exist in the graph, and then it adds an edge from Tj to Ti for every previously scheduled operation qj[x] that conflict with pi[x]. If the resulting graph is cyclic, then the operation pi[x] is rejected, transaction Ti is aborted and the serialization graph is modified by removing all the edges coming in or out of the node Ti and by removing also node Ti itself.
The log
L12=B1 R1[x] B2 R2[x] W1[x] E1 B3 R3[x] W2[x] E2 W3[y] E3
is not allowed by SGT scheduler, since SG(L12) is cyclic (Figure 4.)
Figure 4. SG(H12)
While 2PL and BTO mechanisms totally order the scheduled transactions, the SGT mechanism partially order them. This is the reason why any log which can be handled by 2PL scheduler or BTO scheduler successfully, can also be handled by the SGT scheduler.
Although SGT method provides more concurrency than 2PL and BTO methods, it is not practical to implement the SGT mechanism in the distributed database systems. The problems are similar to the problems in distributed deadlock detection, that is, it is necessary to check the cycles of the serialization graph. If each scheduler maintains a local serialization graph which reflects only the conflicts on the data items it manages, then local serialization graph may not be cyclic, although the global serialization graph, which reflects all the conflicts in the system, is cyclic. Therefore the local serialization graphs should be combined at a scheduler site to construct the global serialization graph. Such a scheme results in high communication cost and the delay caused by message transmissions are more drastical than in the case of deadlock detection. Furthermore, it is not that easy to handle a crash of the scheduler site.
Another solution for handling global serialization graph is achieved by holding the immediate successors of each transaction at the home site and by distributing the graph by the use of messages containing this information. However if global cycles are likely to occur, again it is quite expensive to search the whole graph in this way.
QUESTIONS
1. The following two logs are given for a distributed database management system:
L1: B1 R1[x] B2 R2[x] W1[x] E1 W2[y] E2
L2: B2 R2[x] W2[y] E2 B1 R1[x] W1[x] E1
a. Find the read-from sets of L1 and L2
b. Is L1 equivalent to L2 ? Why ?
c. Is L1 serial? Is L2 serial ? Why ?
d. Is L1 serializable? Is L2 serializable? Why ?
e. Is L1 2PL ? Why ?
f. Is L1 BTO ? Why ?

No comments:

Post a Comment