------------------------------------------------------------------------------- --* @author: P.H.Welch. --* @title: Manager and ALT processes for Multiway Synchronisation (occam-pi). --* @version: 2.0. --* @date: 23/10/2005. --* @acknowledgement: based on ideas from Jim Woodcock and Alistair McEwan. --* @comment: change from version 1.0: --* uses a (RAP) CLAIM on an array of shared channel-ends. ------------------------------------------------------------------------------- -- Note: this version compiles under KRoC version 1.4.0-pre2. -- -- See version 0.0 for a simpler version using mechanisms not yet supported -- by kroc occam-pi. To compile this: -- -- kroc -c mm-sync-2-0.occ ------------------------------------------------------------------------------- -- The following CHAN.INT type is only needed because, currently, -- kroc does not support an array of SHARED singleton channels. CHAN TYPE CHAN.INT MOBILE RECORD CHAN INT c?: : --* This process manages the multiway sync guard with index `index'. --* --* @param index: the index of *this* multiway sync guard. --* --* @param listen: the server channel to this multiway sync manager. --* --* @param register: the indices of the application processes synchronising --* on this mulitway sync. CONTRACT: these indices are strictly increasing --* and are legal indices of the `respond' array. Also, (SIZE register) > 0. --* --* @param respond: the server channels to *all* the application processes. --* PROC multi.sync.manager (VAL INT index, CHAN.INT? listen, VAL []INT register, MOBILE []SHARED CHAN.INT! respond) VAL INT nack.index IS ~index: INITIAL INT count IS SIZE register: MOBILE []SHARED CHAN.INT! my.respond: SEQ -- the following array is set up to to save one level of (frequently -- occuring) array lookup in the main loop. It's also necessary for -- the (RAP-ordered) CLAIM below. my.respond := MOBILE [SIZE register]SHARED CHAN.INT! SEQ i = 0 FOR SIZE my.respond my.respond[i] := respond[register[i]] -- The above sets up `my.respond[i]' as an *alias* of `respond[register[i]]'. -- This is safe (?) since they are SHARED MOBILE channel-ends. WHILE TRUE --* INVARIANT: count is the number of offers still needed SEQ -- gather offers (nacking cancels) WHILE count > 0 INT offer: SEQ listen[c] ? offer --* ASSERT: element (offer, register) IF offer >= 0 count := count - 1 TRUE SEQ count := count + 1 CLAIM respond[~offer] respond[~offer][c] ! nack.index --* DEDUCE: count = 0 -- invite and gather commits in parallel CHAN INT report: PAR -- invite commits (in RAP order), get decision, make response CLAIM my.respond INT decision: SEQ -- invite commits SEQ i = 0 FOR SIZE my.respond my.respond[i][c] ! index -- get decision report ? decision -- make response SEQ i = 0 FOR SIZE my.respond my.respond[i][c] ! decision -- gather commits and report decision SEQ count := SIZE register -- gather commits SEQ i = 0 FOR SIZE register INT commit: SEQ listen[c] ? commit IF commit >= 0 count := count - 1 TRUE SKIP --* DEDUCE: count is the number of offers still needed -- report decision and fix count IF count = 0 SEQ report ! index -- i.e. confirm count := SIZE register TRUE report ! nack.index -- i.e. deny : --* This process performs one ALT over the multiway sync guards indexed --* by the `guard' array. --* --* @param index: the index of *this* ALTing process. --* --* @param guard: the indices of the mulitway sync guards for this ALT. --* CONTRACT: the indices are strictly increasing and are legal indices --* of the `push' array. Also, the mulitway sync guards indexed by this --* array are a subset of the guards in the alphabet of *this* process. --* Also, (SIZE guard) > 0. --* --* @param push: the server channels to *all* the multiway sync managers. --* --* @param listen: the server channel to this ALTing process. --* --* @param chosen: index of the chosen multiway sync guard. --* PROC alt.multi.sync (VAL INT index, VAL []INT guard, MOBILE []SHARED CHAN.INT! push, CHAN.INT? listen, RESULT INT chosen) VAL INT cancel.index IS ~index: INT invite: MOBILE []SHARED CHAN.INT! my.push: -- an optimisation (see below) SEQ -- The following array is set up to to save one level of (frequently -- occuring) array lookup in the main logic. Not sure if worthwhile? my.push := MOBILE [SIZE guard]SHARED CHAN.INT! SEQ i = 0 FOR SIZE my.push my.push[i] := push[guard[i]] -- The above sets up `my.push[i]' as an *alias* of `push[guard[i]]'. -- This is safe (?) since they are SHARED MOBILE channel-ends. -- make offers to everyone SEQ i = 0 FOR SIZE my.push CLAIM my.push[i] my.push[i][c] ! index -- wait/confirm cycle invite := ~0 WHILE invite < 0 SEQ listen[c] ? invite -- inviter has CLAIM push[invite] -- locked push[invite][c] ! index -- the listen listen[c] ? invite -- channel -- decision has been made chosen := invite -- cancel other offers (in parallel with clearing other responses) -- (maybe this could be done in sequence?) PAR -- cancel other offers SEQ i = 0 FOR SIZE my.push IF chosen <> guard[i] CLAIM my.push[i] my.push[i][c] ! cancel.index TRUE SKIP -- clear responses SEQ i = 0 FOR (SIZE guard) - 1 SEQ listen[c] ? invite IF invite >= 0 -- inviter has locked listen[c] ? invite -- the listen channel TRUE SKIP :