diff --git a/_drafts/2024-12-26-STM-code.md b/_drafts/2024-12-26-STM-code.md index cc7a46c..09abc70 100644 --- a/_drafts/2024-12-26-STM-code.md +++ b/_drafts/2024-12-26-STM-code.md @@ -227,16 +227,15 @@ The operations `ref-set`, `alter`, and `commute` are all implemented in `Lockin member this.touch() = LockingTransaction.getEx().doEnsure(this) ``` -The last operation for `Ref` is used by `LockingTransaction` to update the value of a `Ref` in the Ref world. +OUr final operation in `Ref` is used by `LockingTransaction` to update the value of the `Ref` in the Ref world. Here is where history comes into play. We will expand the history list in two situations: (a) the size of the list is less than the minimum history size; and (b) we had a fault in getting the value of this `Ref` and we are less than the maximum history size. -The fault count is incremented in the `LockingTransaction` code. -We will reset the fault count to zero when we increase the list. +The fault count is incremented in the `LockingTransaction` code; we will reset the fault count to zero when we increase the list. These two conditions are tested for in the `if` clause below. If either condition is satisfied, we add a new `RefVal` node to the list. If neither condition is met we replace the oldest value in the list with the new value and make it the new root node. -(The statement `refVals <- refVals.Next` is doing the heavy lifting here. Draw a map.) +(The statement `refVals <- refVals.Next` is doing the heavy lifting here; it rotate the oldet entry to be the root and changes its value. Draw pictures.) ```F# // Set the value @@ -295,7 +294,7 @@ type LTInfo(initState: LTState, startPoint: int64) = s = LTState.Running || s = LTState.Committing ``` -The the state is an `LTState` value, we store it as an `int64`. +The state is an `LTState` value, but stored as an `int64`. We are careful to make changes to the `status` field using `Interlocked` methods bacause other threads may mutate this field. Unfortunately, the `Interlocked` methods won't work on `LTState` values, even though they are represented as `int64` values. I decided to stick with using the `LTState` enumeration rather than integer constants, @@ -331,7 +330,7 @@ We keep track of its first-assigned read point, the _start point_, as a marker o // The point at the start of the transaction. let mutable startPoint : int64 = 0L - // Get a new read point value. + // Get/set a new read point value. member this.getReadPoint() = readPoint <- LockingTransaction.lastPoint.incrementAndGet() @@ -400,14 +399,43 @@ And we have a simple boolean check of existence. Note the special case of seeing an LT with no `LTInfo`. This is a transaction that has been stopped and is working on cleanup. -We haven't gotten rid of it yet but we are working on it -- it is not running. +We haven't gotten rid of it yet but we are in the process of doing so; it is not running. + +### The books that need keeping + +In the previous post, we mentioned the collections that `LockingTransaction` needs to maintain. Here they are: + +```F# + // Ref assignments made in this transaction (both sets and commutes). + let vals = Dictionary() + + // Refs that have been set in this transaction. + let sets = HashSet() + + // Ref commutes that have been made in this transaction. + let commutes = SortedDictionary>() + + // The set of Refs holding read locks. + let ensures = HashSet() +``` +And not having a better place to put it, we need an exception to signal that we need to retry: + +```F# + // Cached retry exception. + let retryEx = RetryEx("") +``` + +where we have already defined + +```F# +exception RetryEx of string +``` ### Running a transaction -The primary entry point for running a transaction is the `runInTransaction` static method. -Basically, it creates a new `LockingTransaction` object, sets it as the current transaction, runs the supplied code, and then tries to commit. -However, if a transaction is already running on the current thread, we will join the existing transaction. -This happens if the body of an outer `dosync` calls `dosync` again, either directly or indirectly. + The Clojure-level macros `dosync` and `sync` both expand in call to `LockingTransaction.runInTransaction`. +This method creates a new `LockingTransaction` object, sets it as the current transaction, runs the supplied code, and then tries to commit. However, if a transaction is already running on the current thread, we will join the existing transaction; +this happens if the body of an outer `dosync` calls `dosync` again, either directly or indirectly. ```F# static member runInTransaction(fn:IFn) : obj = @@ -431,15 +459,14 @@ This happens if the body of an outer `dosync` calls `dosync` again, either direc ``` So `runInTransaction` takes care of seeing if we are already in a transaction and creating / registering one if not. -The real action is in `run`: +The real work is in `run`: ```F# member this.run(fn:IFn) : obj = let mutable finished = false let mutable ret = null let locked = ResizeArray() - let notify = ResizeArray() - + let mutable i = 0 while not finished && i < RetryLimit do @@ -460,10 +487,11 @@ The real action is in `run`: with | :? RetryEx -> () - | ex when not (LockingTransaction.containsNestedRetryEx(ex)) -> reraise() + | ex when (LockingTransaction.containsNestedRetryEx(ex)) -> () + | _ -> reraise() finally - // CODE TO CLEAN UP AFTER EACH ATTEMPT, whether successful or not - + + // CODE TO CLEAN UP AFTER EACH ATTEMPT, whether successful or not i <- i + 1 @@ -472,6 +500,341 @@ The real action is in `run`: ret ``` +The main body of `run` is a loop that will keep (re)trying calling the body (`fn.invoke()`) until the commit code succeeds (that will set the `finished` flag to `true`) or we iterate the maximum number of times. +If during this activity, either the invocation or the commit, a `RetryEx` is thrown, we catch it and continue the loop. +If we exit the loop and `finished` is still `false`, we throw an exception: we have failed to commit the transaction within the allotted number of attempts. + +BTW, that's a lot of failures: + +```F# + // The number of times to retry a transaction in case of a conflict. + [] + let RetryLimit = 10000 +``` + +As noted earlier, we do create a new `LTInfo` object for each attempt. +Before looking at the commit code that was elided above, let's look at what the function invocation might be doing. + + +### Running the body + +We earlier showed methods in `Ref` that forward action to the LT that's running on this thread: + +```F# + member this.set(v: obj) = LockingTransaction.getEx().doSet(this, v) + member this.commute(fn: IFn, args: ISeq) = LockingTransaction.getEx().doCommute(this, fn, args) + member this.alter(fn: IFn, args: ISeq) = + let t = LockingTransaction.getEx() + t.doSet(this, fn.applyTo(RTSeq.cons(t.doGet(this), args))) + member this.touch() = LockingTransaction.getEx().doEnsure(this) +``` + +Also, `Ref`'s implemention of `IDeref.deref` calls `doGet`. Let's start with the easiest one: `doGet`. + +```F# + member this.doGet(r: Ref) : obj = + if info.Value.isRunning then + if vals.ContainsKey(r) then + vals.[r] + else + let valOpt = + try + r.enterReadLock() + + let rec loop (ver:RefVal) : obj option = + if ver.Point <= readPoint then + Some ver.Value + elif Object.ReferenceEquals(ver.Prior, r.getRVals()) then + None + else + loop ver.Prior + + loop (r.getRVals()) + + finally + r.exitReadLock() + + match valOpt with + |None -> + // no version of val precedes the read point + r.addFault() + raise retryEx + |Some v -> v + else + raise retryEx +``` +First, we check to see if this transaction is still running. It was running when `getEx` was called, but some other thread might have sneaked in and barged us in between then and now. Checking here corresponds the our earlier promise to always check our status. +If our transaction is no longer running, we throw a `RetryEx`. + +If we have done a `ref-set` or `alter` in this transaction, we will have an in-transaction value stored in `vals`, so we look for that. Otherwise, we need to go to the `Ref` `r` and see what value it has. We do not necessarily need the latest value, just any value with a timestamp (`ver.Point`) less than the timestamp we are working under (`readPoint`). The function `loop` is a simple iteration through the double-linked list. It returns an `obj option` so we can signal 'no version in the history precedes the read point'. In that case, we mark on the Ref that a fault has occurred and throw for a retry. If we do find a value, we return it. + +Okay. That's the easy one. `doCommute` is not harder. + +```F# + member this.doCommute(r:Ref, fn:IFn, args:ISeq) : obj = + if not info.Value.isRunning then + raise retryEx + + if not (vals.ContainsKey(r)) then + let v = + try + r.enterReadLock() + r.currentVal() + finally + r.exitReadLock() + vals[r] <- v + + let mutable fns : ResizeArray = null + if not (commutes.TryGetValue(r, &fns)) then + fns <- ResizeArray() + commutes[r] <- fns + fns.Add({ fn = fn; args = args }) + let v = fn.applyTo(RTSeq.cons(vals[r], args)) + vals[r] <- v + v +``` + +Check to make sure our transaction is still running. Check to see if we have an in-transaction value set. If not, grab the current value of the Ref and store it as the in-transaction value (for just a moment). +Create an entry for this Ref in `commutes` with an empty list as the associated value if no entry exists. +And then run commutes action using the present value of the Ref and store that result as the in-transaction value. +The only locking required is a brief hold of a read lock on the Ref to grab the current value. + +`doEnsure` is the first place we look at checking for the 'stamp' on a Ref: + +```F# + // Touch a ref. (Lock it.) + member this.doEnsure(r: Ref) = + if info.Value.isRunning then + if ensures.Contains(r) then + () + else + r.enterReadLock() + + // someone completed a write after our snapshot + if r.currentPoint() > readPoint then + r.exitReadLock() + raise retryEx + + match r.TxInfo with + | Some refinfo when refinfo.isRunning -> + r.exitReadLock() + if not <| Object.ReferenceEquals(refinfo,info) then + this.blockAndBail(refinfo) + | _ -> ensures.Add(r) |> ignore + + else + raise retryEx +``` + +Check to make sure our transaction is still running. If the Ref is already ensured, this is a no-op. +Otherwise, we get a read lock to check the current value of the Ref. First, if someone has done a real commit after we started, we are going to retry -- and exit the read lock before throwing. Second, we look at `r.TxInfo` to see if someone has put a stamp on it. +If so, we release the read lock. Now the tricky part I mentioned in the previous post. If someone else has the stamp, we are screwed, so we block-and-bail. If not, then the stamp is our stamp -- we just get out -- this is a no-op. If there no stamp or the stamper is not running, then we add the Ref to our `ensures` collection. Phew. + +And now the monster: + +```F# + // Set the value of a ref inside the transaction. + member this.doSet(r: Ref, v: obj) = + if info.Value.isRunning then + if commutes.ContainsKey(r) then + raise <| InvalidOperationException("Can't set after commute") + if not (sets.Contains(r)) then + sets.Add(r) |> ignore + this.lock(r) |> ignore + vals.[r] <- v + v + else + raise retryEx +``` + +Check to make sure our transaction is still running. If the Ref has had a commute call on it already, this is an illegal operation. +If the Ref is not already in our `sets` collection, then we add it to the collection and call `lock`. +And record the in-transaction value for the Ref in the `vals` map. + +That wasn't so bad. + + The `lock` method is hiding the nastiness. + +```F# + member private this.lock(r : Ref) = + + // can't upgrade read lock, so release it. + this.releaseIfEnsured(r) + + let mutable unlocked = true + try + this.tryWriteLock(r) + unlocked <- false + + if r.currentPoint() > readPoint then + raise retryEx + + let success() = + r.TxInfo <- info + r.currentVal() + + match r.TxInfo with + | Some (refinfo:LTInfo) when refinfo.isRunning && not <| obj.ReferenceEquals(refinfo, info) -> + if this.barge(refinfo) then + success() + else + r.exitWriteLock() + unlocked <- true + this.blockAndBail(refinfo) + | _ -> success() + finally + if not unlocked then + r.exitWriteLock() +``` + +The first is to call `releaseIfEnsured`. This checks if the Ref is in the `ensures` collection. If it is, we remove it from the collection _and release its read lock_. This last piece is critical. Membership in `ensures` if-and-only-if has a read lock. + +```F# + member private this.releaseIfEnsured(r: Ref) = + if ensures.Contains(r) then + ensures.Remove(r) |> ignore + r.exitReadLock() +``` + +The next thing is try to get a write lock. If that times out, an exception is thrown. The `unlocked` flag controls whether we need to release the write lock on our way of the method. + +Now we see some checks that are starting maybe are starting to look familiar. If the Ref's value in the real world has been set subsequent to the start of our attempt, retry. Next, check the stamp on the Ref. If there is a stamp and the stamper is running and it is not us, we have some work to do. (Next paragraph.) Otherwise, put our stamp on it and return the current value. (Why return the current value? It just gets ignore. I don't know.) + +If there is running stamper that is not us, well, one of us has got to go. We try to barge the other guy. If we do, then all is good -- declare succees. If we cannot barge the other guy, than we have to kill our attempt and retry -- block-and-bail. + +One little subtlety here. Not that if don't barge, we release the write lock (and set the indicator flag) before call block-and-bail, even though the `finally` clause would release the write lock for us otherwise. This gives other threads a chance to get a lock (read or write) before we finish the block-and-bail. We're just being polite. + +Barging is relatively straightforward. + +```F# + member private this.barge(refinfo: LTInfo) = + let mutable barged = false + + // if this transaction is older, try to abort the other + if this.bargeTimeElapsed && startPoint < refinfo.StartPoint then + barged <- refinfo.compareAndSet(LTState.Running, LTState.Killed) + if barged then + refinfo.Latch.CountDown() + barged +``` + +First, we don't try to barge unless we have been around for a while. A little patience is called for. + + +```F# + // How old our transaction must be before we 'barge' it. + // Java version has BARGE_WAIT_NANOS, set at 10*1_000_000. + // If I'm thinking correctly tonight, that's 10 milliseconds. + // Ticks here are 100 nanos, so we should have 10 * 1_000_000/100 = 100_000. + [] + let BargeWaitTicks = 100_000L + + // Determine if sufficient clock time has elapsed to barge another transaction. + member private _.bargeTimeElapsed = (int64 Environment.TickCount) - startTime > BargeWaitTicks +``` + +Second, we only barge the other transaction if we are older. This compares start points instead of read points, so this dates back to the first attempts of each transaction, not the current retry of each. (To determine who is older, look at dates of birth, not who had the most recent birthday.) The `compareAndSet` uses an `Interlocked` method. Why might this fail? The other transaction was running just before we got here. Well, someone else may have barged it, or it may already be committing or have committed or somehow gotten aborted. In those cases, even if it might be okay to proceed, it's too hard to tell, so let's just retry (via the block-and-bail that will follow). If we do barge, we trigger the latch on the other transaction's `LTInfo` on our way out. More on this in a minute (actually, 100 milliseconds). + +The block-and-bail operation is short: + +```F# + member private this.blockAndBail(refinfo: LTInfo) = + this.stop(LTState.Retry) + try + refinfo.Latch.Await(LockWaitMsecs) |> ignore + with + | :? ThreadInterruptedException -> () + raise retryEx +``` + +The `stop` call does some cleanup to get us ready for the a retry.: + +```F# + + // Stop this transaction. + member private this.stop(state: LTState) = + match info with + | None -> () + | Some sinfo -> + lock sinfo (fun () -> + sinfo.State <- state + sinfo.Latch.CountDown()) + info <- None + vals.Clear() + sets.Clear() + commutes.Clear() +``` + + Next we wait (at least for a little bit) for the other transaction's latch to count down. + Why wait? We just decided to block-and-bail. This only happens when we've run into a conflict and we've lost the battle. + If we just barreled through and started our retry immediately, the other transaction might not have had time to finish and quit running (either completely or that iteration) -- we'd just get locked again. Why not just wait? Why is the latch involved? + Well, if the other transaction gets barged or if it transitions itself to a new state (this happens in `stop`), the latch will count down. That can release us to proceed more quickly than `LockWaitMsecs`. + +### Do not fear commitment + +We're almost done. Back in `run`, we left a hole to fill: `// CODE TO COMMIT COMES HERE`. + +```F# + if newLTInfo.compareAndSet(LTState.Running,LTState.Committing) then + + for pair in commutes do + let r = pair.Key + if sets.Contains(r) then + () + else + let wasEnsured = ensures.Contains(r) + // can't upgrade read lock, so release + this.releaseIfEnsured(r) + this.tryWriteLock(r) + locked.Add(r) + + if wasEnsured && r.currentPoint() > readPoint then + raise retryEx + + match r.TxInfo with + | Some refinfo when refinfo <> newLTInfo && refinfo.isRunning -> + if not (this.barge(refinfo)) then + raise retryEx + | _ -> () + + let v = r.currentVal() + vals.[r] <- v + for f in pair.Value do + vals.[r] <- f.fn.applyTo(RTSeq.cons(vals.[r], f.args)) + + for r in sets do + this.tryWriteLock(r) + locked.Add(r) + + // at this point, all values calced, all refs to be written locked + // no more client code to be called + let commitPoint = LockingTransaction.getCommitPoint() + for pair in vals do + let r = pair.Key + let oldval = r.currentVal() + let newval = pair.Value + r.setValue(newval, commitPoint) + + finished <- true + newLTInfo.set(LTState.Committed) |> ignore +``` + +We try to set our state from `Running` to `Committing`; if this fails, we got barged and we don't try to commit. +For each commuted Ref, we check if it was ensured (unlock it if so), get a write lock, then apply all the commute computations in succession, ending up with a final in-transaction value for the Ref. Then get write locks on all the `sets` Refs. + +At this point, all the values are calculated and stored in the `vals` map and we have write locks on the Refs. Set the values on all the Refs in the real world. Because we have a write lock, no one will see any of these changes utnil we are done. At the end, we change our state to `Committed`. + +Last and definitely not least: `// CODE TO CLEAN UP AFTER EACH ATTEMPT, whether successful or not`/ This code is in a finally block. Whether we get killed or otherwise failed or if we committed, we have to clean up any locks we might be holding. Those are WriteLocks in the `locked` collection (only if we committed) and all the read locks on the ensured Refs (whether we committed or are retrying). + +```F# + for k = locked.Count - 1 downto 0 do + locked.[k].exitWriteLock() + locked.Clear() + for r in ensures do + r.exitReadLock() + ensures.Clear() + this.stop(if finished then LTState.Committed else LTState.Retry) +``` -The `finished` flag will be set to `true` in the commit code if the transaction is successful. We'll see that code in just a moment. +And that, I hope, suffices.