namespace System
namespace System.Threading
namespace System.Threading.Tasks
namespace System.Net
namespace MBrace
namespace MBrace.Core
namespace MBrace.Core.Internals
module BuilderAsyncExtensions
from MBrace.Core
namespace MBrace.Thespian
namespace MBrace.Flow
namespace Microsoft
namespace Microsoft.FSharp
namespace Microsoft.FSharp.Control
namespace Nessos
namespace Nessos.FsPickler
namespace Nessos.FsPickler.Json
type FsPickler =
static member CreateBsonSerializer : ?typeConverter:ITypeNameConverter -> BsonSerializer
static member CreateJsonSerializer : ?indent:bool * ?omitHeader:bool * ?typeConverter:ITypeNameConverter -> JsonSerializer
Full name: Nessos.FsPickler.Json.FsPickler
static member FsPickler.PickleToJson : x:'T -> string
Full name: Index.PickleToJson
val x : 'T
Multiple items
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = String
Full name: Microsoft.FSharp.Core.string
val failwith : message:string -> 'T
Full name: Microsoft.FSharp.Core.Operators.failwith
namespace XPlot
module GoogleCharts
from XPlot
val workA : obj
Full name: Index.workA
val workB : obj
Full name: Index.workB
val uris : Uri list
Full name: Index.uris
module Unchecked
from Microsoft.FSharp.Core.Operators
val defaultof<'T> : 'T
Full name: Microsoft.FSharp.Core.Operators.Unchecked.defaultof
Multiple items
type Uri =
new : uriString:string -> Uri + 5 overloads
member AbsolutePath : string
member AbsoluteUri : string
member Authority : string
member DnsSafeHost : string
member Equals : comparand:obj -> bool
member Fragment : string
member GetComponents : components:UriComponents * format:UriFormat -> string
member GetHashCode : unit -> int
member GetLeftPart : part:UriPartial -> string
...
Full name: System.Uri
--------------------
Uri(uriString: string) : unit
Uri(uriString: string, uriKind: UriKind) : unit
Uri(baseUri: Uri, relativeUri: string) : unit
Uri(baseUri: Uri, relativeUri: Uri) : unit
type 'T list = List<'T>
Full name: Microsoft.FSharp.Collections.list<_>
val ( ??? )<'T> : 'T
Full name: Index.( ??? )
val cluster : MBrace.Runtime.MBraceClient
Full name: Index.cluster
namespace MBrace.Runtime
Multiple items
type MBraceClient =
new : runtime:IRuntimeManager * defaultFaultPolicy:FaultPolicy -> MBraceClient
member AttachLogger : logger:ISystemLogger -> IDisposable
member ClearAllProcesses : unit -> unit
member ClearProcess : cloudProcess:CloudProcess<'T> -> unit
member ClearSystemLogs : unit -> unit
member ClearSystemLogsAsync : unit -> Async<unit>
member CreateCancellationTokenSource : ?parents:seq<ICloudCancellationToken> -> ICloudCancellationTokenSource
member CreateProcess : workflow:Cloud<'T> * ?cancellationToken:ICloudCancellationToken * ?faultPolicy:FaultPolicy * ?target:IWorkerRef * ?additionalResources:ResourceRegistry * ?taskName:string -> CloudProcess<'T>
member CreateProcessAsync : workflow:Cloud<'T> * ?cancellationToken:ICloudCancellationToken * ?faultPolicy:FaultPolicy * ?target:IWorkerRef * ?additionalResources:ResourceRegistry * ?taskName:string -> Async<CloudProcess<'T>>
member FormatProcesses : unit -> string
...
Full name: MBrace.Runtime.MBraceClient
--------------------
new : runtime:MBrace.Runtime.IRuntimeManager * defaultFaultPolicy:MBrace.Core.FaultPolicy -> MBrace.Runtime.MBraceClient
type ThreadPool =
static member BindHandle : osHandle:nativeint -> bool + 1 overload
static member GetAvailableThreads : workerThreads:int * completionPortThreads:int -> unit
static member GetMaxThreads : workerThreads:int * completionPortThreads:int -> unit
static member GetMinThreads : workerThreads:int * completionPortThreads:int -> unit
static member QueueUserWorkItem : callBack:WaitCallback -> bool + 1 overload
static member RegisterWaitForSingleObject : waitObject:WaitHandle * callBack:WaitOrTimerCallback * state:obj * millisecondsTimeOutInterval:uint32 * executeOnlyOnce:bool -> RegisteredWaitHandle + 3 overloads
static member SetMaxThreads : workerThreads:int * completionPortThreads:int -> bool
static member SetMinThreads : workerThreads:int * completionPortThreads:int -> bool
static member UnsafeQueueNativeOverlapped : overlapped:NativeOverlapped -> bool
static member UnsafeQueueUserWorkItem : callBack:WaitCallback * state:obj -> bool
...
Full name: System.Threading.ThreadPool
static member ThreadPool.QueueUserWorkItem : f:(unit -> unit) -> unit
Full name: Index.QueueUserWorkItem
val f : (unit -> unit)
type unit = Unit
Full name: Microsoft.FSharp.Core.unit
static member ThreadPool.QueueUserWorkItem : f:(unit -> unit) -> unit
ThreadPool.QueueUserWorkItem(callBack: WaitCallback) : bool
ThreadPool.QueueUserWorkItem(callBack: WaitCallback, state: obj) : bool
type WaitCallback =
delegate of obj -> unit
Full name: System.Threading.WaitCallback
val ignore : value:'T -> unit
Full name: Microsoft.FSharp.Core.Operators.ignore
val dataSizes : int list
Full name: Index.Wordcount.dataSizes
val clusterSizes : int list
Full name: Index.Wordcount.clusterSizes
val downloadTimes : float list
Full name: Index.Wordcount.downloadTimes
val wordCountTimes : float list
Full name: Index.Wordcount.wordCountTimes
val labels : string list
Full name: Index.Wordcount.labels
Multiple items
module List
from Microsoft.FSharp.Collections
--------------------
type List<'T> =
| ( [] )
| ( :: ) of Head: 'T * Tail: 'T list
interface IEnumerable
interface IEnumerable<'T>
member GetSlice : startIndex:int option * endIndex:int option -> 'T list
member Head : 'T
member IsEmpty : bool
member Item : index:int -> 'T with get
member Length : int
member Tail : 'T list
static member Cons : head:'T * tail:'T list -> 'T list
static member Empty : 'T list
Full name: Microsoft.FSharp.Collections.List<_>
val zip : list1:'T1 list -> list2:'T2 list -> ('T1 * 'T2) list
Full name: Microsoft.FSharp.Collections.List.zip
val map : mapping:('T -> 'U) -> list:'T list -> 'U list
Full name: Microsoft.FSharp.Collections.List.map
val c : int
val d : int
val sprintf : format:Printf.StringFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
val downloadData : (string * float) list
Full name: Index.Wordcount.downloadData
val wordCountData : (string * float) list
Full name: Index.Wordcount.wordCountData
val chart : GoogleChart
Full name: Index.Wordcount.chart
type Chart =
static member Annotation : data:seq<#seq<DateTime * 'V * string * string>> * ?Labels:seq<string> * ?Options:Options -> GoogleChart (requires 'V :> value)
static member Annotation : data:seq<DateTime * #value * string * string> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
static member Area : data:seq<#seq<'K * 'V>> * ?Labels:seq<string> * ?Options:Options -> GoogleChart (requires 'K :> key and 'V :> value)
static member Area : data:seq<#key * #value> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
static member Bar : data:seq<#seq<'K * 'V>> * ?Labels:seq<string> * ?Options:Options -> GoogleChart (requires 'K :> key and 'V :> value)
static member Bar : data:seq<#key * #value> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
static member Bubble : data:seq<string * #value * #value * #value * #value> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
static member Bubble : data:seq<string * #value * #value * #value> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
static member Bubble : data:seq<string * #value * #value> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
static member Calendar : data:seq<DateTime * #value> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
...
Full name: XPlot.GoogleCharts.Chart
static member Chart.Column : data:seq<#seq<'K * 'V>> * ?Labels:seq<string> * ?Options:Options -> GoogleChart (requires 'K :> key and 'V :> value)
static member Chart.Column : data:seq<#key * #value> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
static member Chart.WithXTitle : xTitle:string -> chart:GoogleChart -> GoogleChart
static member Chart.WithYTitle : yTitle:string -> chart:GoogleChart -> GoogleChart
static member Chart.WithSize : size:(int * int) -> chart:GoogleChart -> GoogleChart
static member Chart.Show : chart:GoogleChart -> GoogleChart
val clusterSizes : int list
Full name: Index.KNN.clusterSizes
val times : float list
Full name: Index.KNN.times
val localLabel : string
Full name: Index.KNN.localLabel
val localTime : float
Full name: Index.KNN.localTime
val data : (string * float) list
Full name: Index.KNN.data
val sz : int
val t : float
val chart : GoogleChart
Full name: Index.KNN.chart
property GoogleChart.Html: string
val clusterSizes : int list
Full name: Index.Propan.clusterSizes
val times : float list
Full name: Index.Propan.times
val localLabel : string
Full name: Index.Propan.localLabel
val localTime : float
Full name: Index.Propan.localTime
val data : (string * float) list
Full name: Index.Propan.data
val chart : GoogleChart
Full name: Index.Propan.chart
val locData : (string * float) list
Full name: Index.Code.locData
val chart : GoogleChart
Full name: Index.Code.chart
static member Chart.Bar : data:seq<#seq<'K * 'V>> * ?Labels:seq<string> * ?Options:Options -> GoogleChart (requires 'K :> key and 'V :> value)
static member Chart.Bar : data:seq<#key * #value> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
Multiple items
type ReflectedDefinitionAttribute =
inherit Attribute
new : unit -> ReflectedDefinitionAttribute
new : includeValue:bool -> ReflectedDefinitionAttribute
member IncludeValue : bool
Full name: Microsoft.FSharp.Core.ReflectedDefinitionAttribute
--------------------
new : unit -> ReflectedDefinitionAttribute
new : includeValue:bool -> ReflectedDefinitionAttribute
val add : x:'a -> y:'b -> 'c
Full name: Index.add
val x : 'a
val y : 'b
member MBrace.Runtime.MBraceClient.Run : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:ResourceRegistry * ?taskName:string -> 'T
type IContext =
interface
abstract member Resolve : unit -> 'TResource
end
Full name: Index.IContext
abstract member IContext.Resolve : unit -> 'TResource
Full name: Index.IContext.Resolve
Multiple items
type Cloud =
private new : unit -> Cloud
static member FromContinuations : body:(ExecutionContext -> Continuation<'T> -> unit) -> Cloud<'T>
static member GetExecutionContext : unit -> LocalCloud<ExecutionContext>
static member GetResource : unit -> LocalCloud<'TResource>
static member GetResourceRegistry : unit -> LocalCloud<ResourceRegistry>
static member RunSynchronously : workflow:Cloud<'T> * ?resources:ResourceRegistry * ?cancellationToken:ICloudCancellationToken -> 'T
static member Start : workflow:Cloud<unit> * ?resources:ResourceRegistry * ?cancellationToken:ICloudCancellationToken -> unit
static member StartAsTask : workflow:Cloud<'T> * ?resources:ResourceRegistry * ?cancellationToken:ICloudCancellationToken * ?taskCreationOptions:TaskCreationOptions -> Task<'T>
static member StartImmediate : workflow:Cloud<unit> * ?resources:ResourceRegistry * ?cancellationToken:ICloudCancellationToken -> unit
static member StartImmediateWithContinuations : workflow:Cloud<'T> * continuation:Continuation<'T> * context:ExecutionContext -> unit
...
Full name: MBrace.Core.Internals.Cloud
--------------------
type Cloud<'T> = IContext -> (IContext -> 'T -> unit) -> unit
Full name: Index.Cloud<_>
type IScheduler =
interface
abstract member Enqueue : (IContext -> unit) -> unit
end
Full name: Index.IScheduler
abstract member IScheduler.Enqueue : (IContext -> unit) -> unit
Full name: Index.IScheduler.Enqueue
Multiple items
type Cloud =
static member GetResource : unit -> Cloud<'TResource>
static member Parallel : children:Cloud<'T> [] -> Cloud<'T []>
Full name: Index.Cloud
--------------------
type Cloud<'T> = IContext -> (IContext -> 'T -> unit) -> unit
Full name: Index.Cloud<_>
Multiple items
static member Cloud.Parallel : children:Cloud<'T> [] -> Cloud<'T []>
Full name: Index.Cloud.Parallel
--------------------
type Parallel =
static member For : fromInclusive:int * toExclusive:int * body:Action<int> -> ParallelLoopResult + 11 overloads
static member ForEach<'TSource> : source:IEnumerable<'TSource> * body:Action<'TSource> -> ParallelLoopResult + 19 overloads
static member Invoke : [<ParamArray>] actions:Action[] -> unit + 1 overload
Full name: System.Threading.Tasks.Parallel
val children : Cloud<'T> []
val ctx : IContext
val k : (IContext -> 'T [] -> unit)
val sched : IScheduler
abstract member IContext.Resolve : unit -> 'TResource
val n : int ref
val ts : 'T []
Multiple items
val ref : value:'T -> 'T ref
Full name: Microsoft.FSharp.Core.Operators.ref
--------------------
type 'T ref = Ref<'T>
Full name: Microsoft.FSharp.Core.ref<_>
type Array =
member Clone : unit -> obj
member CopyTo : array:Array * index:int -> unit + 1 overload
member GetEnumerator : unit -> IEnumerator
member GetLength : dimension:int -> int
member GetLongLength : dimension:int -> int64
member GetLowerBound : dimension:int -> int
member GetUpperBound : dimension:int -> int
member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
member Initialize : unit -> unit
member IsFixedSize : bool
...
Full name: System.Array
val zeroCreate : count:int -> 'T []
Full name: Microsoft.FSharp.Collections.Array.zeroCreate
property Array.Length: int
val runChild : (int -> (IContext -> ('a -> 'T -> unit) -> unit) -> unit)
val i : int
val ch : (IContext -> ('a -> 'T -> unit) -> unit)
abstract member IScheduler.Enqueue : (IContext -> unit) -> unit
val t : 'T
type Interlocked =
static member Add : location1:int * value:int -> int + 1 overload
static member CompareExchange : location1:int * value:int * comparand:int -> int + 6 overloads
static member Decrement : location:int -> int + 1 overload
static member Exchange : location1:int * value:int -> int + 6 overloads
static member Increment : location:int -> int + 1 overload
static member Read : location:int64 -> int64
Full name: System.Threading.Interlocked
Interlocked.Increment(location: byref<int64>) : int64
Interlocked.Increment(location: byref<int>) : int
val iteri : action:(int -> 'T -> unit) -> array:'T [] -> unit
Full name: Microsoft.FSharp.Collections.Array.iteri
type IFileSystem =
interface
abstract member FileExists : path:string -> bool
end
Full name: Index.IFileSystem
abstract member IFileSystem.FileExists : path:string -> bool
Full name: Index.IFileSystem.FileExists
type bool = Boolean
Full name: Microsoft.FSharp.Core.bool
type CloudFile =
static member Exists : path:string -> Cloud<bool>
Full name: Index.CloudFile
static member CloudFile.Exists : path:string -> Cloud<bool>
Full name: Index.CloudFile.Exists
val path : string
val k : (IContext -> bool -> unit)
val fileSystem : IFileSystem
abstract member IFileSystem.FileExists : path:string -> bool
static member Cloud.GetResource : unit -> Cloud<'TResource>
Full name: Index.Cloud.GetResource
val k : (IContext -> 'TResource -> unit)
type IScheduler =
interface
abstract member Enqueue : ('a0 -> unit) -> unit
end
Full name: index.IScheduler
abstract member IScheduler.Enqueue : ('a0 -> unit) -> unit
Full name: index.IScheduler.Enqueue
Multiple items
type ThreadPoolContext =
interface IContext
new : unit -> ThreadPoolContext
Full name: Index.ThreadPoolContext
--------------------
new : unit -> ThreadPoolContext
override ThreadPoolContext.Resolve : unit -> 'T
Full name: Index.ThreadPoolContext.Resolve
Multiple items
type ThreadPoolScheduler =
interface IScheduler
new : unit -> ThreadPoolScheduler
Full name: Index.ThreadPoolScheduler
--------------------
new : unit -> ThreadPoolScheduler
val unbox : value:obj -> 'T
Full name: Microsoft.FSharp.Core.Operators.unbox
override ThreadPoolScheduler.Enqueue : job:(IContext -> unit) -> unit
Full name: Index.ThreadPoolScheduler.Enqueue
val job : (IContext -> unit)
type ThreadPool =
static member Run : workflow:Cloud<'T> -> 'T
Full name: Index.ThreadPool
static member ThreadPool.Run : workflow:Cloud<'T> -> 'T
Full name: Index.ThreadPool.Run
val workflow : Cloud<'T>
val tcs : TaskCompletionSource<'T>
Multiple items
type TaskCompletionSource<'TResult> =
new : unit -> TaskCompletionSource<'TResult> + 3 overloads
member SetCanceled : unit -> unit
member SetException : exception:Exception -> unit + 1 overload
member SetResult : result:'TResult -> unit
member Task : Task<'TResult>
member TrySetCanceled : unit -> bool
member TrySetException : exception:Exception -> bool + 1 overload
member TrySetResult : result:'TResult -> bool
Full name: System.Threading.Tasks.TaskCompletionSource<_>
--------------------
TaskCompletionSource() : unit
TaskCompletionSource(creationOptions: TaskCreationOptions) : unit
TaskCompletionSource(state: obj) : unit
TaskCompletionSource(state: obj, creationOptions: TaskCreationOptions) : unit
TaskCompletionSource.SetResult(result: 'T) : unit
property TaskCompletionSource.Task: Task<'T>
property Task.Result: 'T
Multiple items
type AzureScheduler =
interface IScheduler
new : unit -> AzureScheduler
Full name: Index.AzureScheduler
--------------------
new : unit -> AzureScheduler
override AzureScheduler.Enqueue : job:(IContext -> unit) -> unit
Full name: Index.AzureScheduler.Enqueue
val mkFunc : unit -> (int -> int)
Full name: Index.mkFunc
val rand : int
Multiple items
type Random =
new : unit -> Random + 1 overload
member Next : unit -> int + 2 overloads
member NextBytes : buffer:byte[] -> unit
member NextDouble : unit -> float
Full name: System.Random
--------------------
Random() : unit
Random(Seed: int) : unit
val x : int
static member FsPickler.PickleToJson : x:'T -> string
Multiple items
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
Multiple items
type CloudBuilder =
new : unit -> CloudBuilder
member Bind : f:Cloud<'T> * g:('T -> Cloud<'S>) -> Cloud<'S>
member Return : x:'T -> Cloud<'T>
member ReturnFrom : f:Cloud<'T> -> (IContext -> (IContext -> 'T -> unit) -> unit)
Full name: Index.CloudBuilder
--------------------
new : unit -> CloudBuilder
member CloudBuilder.Return : x:'T -> Cloud<'T>
Full name: Index.CloudBuilder.Return
val __ : CloudBuilder
member CloudBuilder.Bind : f:Cloud<'T> * g:('T -> Cloud<'S>) -> Cloud<'S>
Full name: Index.CloudBuilder.Bind
val f : Cloud<'T>
val g : ('T -> Cloud<'S>)
member CloudBuilder.ReturnFrom : f:Cloud<'T> -> (IContext -> (IContext -> 'T -> unit) -> unit)
Full name: Index.CloudBuilder.ReturnFrom
val cloud : CloudBuilder
Full name: Index.cloud
member MBrace.Runtime.MBraceClient.Run : t:Cloud<'T> -> 'T
Full name: Index.Run
val t : Cloud<'T>
val pickleCC : unit -> ctx:IContext -> cont:(IContext -> string -> unit) -> unit
Full name: Index.pickleCC
val cont : (IContext -> string -> unit)
val serializer : JsonSerializer
static member FsPickler.CreateJsonSerializer : ?indent:bool * ?omitHeader:bool * ?typeConverter:ITypeNameConverter -> JsonSerializer
val pickle : string
member FsPicklerTextSerializer.PickleToString : value:'T * ?pickler:Pickler<'T> * ?streamingContext:Runtime.Serialization.StreamingContext -> string
val dive : n:int -> Cloud<string>
Full name: Index.dive
val n : int
val c : string
member MBrace.Runtime.MBraceClient.Run : t:Cloud<'T> -> 'T
member MBrace.Runtime.MBraceClient.Run : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:ResourceRegistry * ?taskName:string -> 'T
type obj = Object
Full name: Microsoft.FSharp.Core.obj
namespace System.Reflection
type Assembly =
member CodeBase : string
member CreateInstance : typeName:string -> obj + 2 overloads
member EntryPoint : MethodInfo
member Equals : o:obj -> bool
member EscapedCodeBase : string
member Evidence : Evidence
member FullName : string
member GetCustomAttributes : inherit:bool -> obj[] + 1 overload
member GetCustomAttributesData : unit -> IList<CustomAttributeData>
member GetExportedTypes : unit -> Type[]
...
Full name: System.Reflection.Assembly
Common Misconceptions
- "MBrace is an F# only framework"
- "MBrace is based on F# quotations"
In the Past
1:
2:
3:
4:
|
[<ReflectedDefinition>]
let add x y = cloud { return x + y }
cluster.Run <@ add 1 2 @>
|
Now (F#)
1:
2:
3:
|
let add x y = cloud { return x + y }
cluster.Run (add 1 2)
|
Now (C#)
1:
2:
3:
|
> double add(double x, double y) { return x + y; }
> cluster.Run(CloudBuilder.FromFunc(() => add(1,2)))
|
No interpretation or expression tree compilation!
MBrace.Core
A continuation-based library for
orchestrating distributed .NET code
Cloud<'T> = Async<'T> + context
1:
2:
|
type IContext =
abstract Resolve<'TResource> : unit -> 'TResource
|
1:
|
type Cloud<'T> = IContext -> (IContext -> 'T -> unit) -> unit
|
1:
2:
|
type IScheduler =
abstract Enqueue : (IContext -> unit) -> unit
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
|
type Cloud =
static member Parallel(children:Cloud<'T>[]) : Cloud<'T[]> =
fun ctx k ->
let sched = ctx.Resolve<IScheduler>()
let n, ts = ref 0, Array.zeroCreate<'T> children.Length
let runChild i ch =
sched.Enqueue(fun ctx ->
ch ctx (fun _ t -> ts.[i] <- t)
if Interlocked.Increment n = ts.Length then k ctx ts)
children |> Array.iteri runChild
|
Async vs. Cloud
- Async has hard-wired dependency on .NET thread pool
- Cloud scheduler parameterized by context
- Cloud parameterizable by other resources
1:
2:
3:
|
type IFileSystem =
abstract FileExists : path:string -> bool
(* ... *)
|
1:
2:
3:
4:
5:
|
type CloudFile =
static member Exists(path : string) : Cloud<bool> =
fun ctx k ->
let fileSystem = ctx.Resolve<IFileSystem>()
k ctx (fileSystem.FileExists(path))
|
1:
2:
3:
|
type Cloud with
static member GetResource<'TResource>() : Cloud<'TResource> =
fun ctx k -> k ctx (ctx.Resolve<'TResource> ())
|
1:
2:
|
type IScheduler =
abstract Enqueue : (IContext -> unit) -> unit
|
A thread pool scheduler
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
|
type ThreadPoolContext() =
interface IContext with
member __.Resolve<'T>() = ThreadPoolScheduler() |> unbox<'T>
and ThreadPoolScheduler() =
interface IScheduler with
member __.Enqueue(job : IContext -> unit) =
ThreadPool.QueueUserWorkItem(fun () -> job (ThreadPoolContext()))
type ThreadPool =
static member Run(workflow : Cloud<'T>) : 'T =
let tcs = new TaskCompletionSource<'T>()
let sched = new ThreadPoolScheduler() :> IScheduler
sched.Enqueue(fun ctx -> workflow ctx (fun _ t -> tcs.SetResult t))
tcs.Task.Result
|
How does one implement a distributed scheduler?
1:
2:
3:
|
type AzureScheduler() =
interface IScheduler with
member __.Enqueue(job : IContext -> unit) = ``???``
|
Require capacity to serialize arbitrary lambdas
What is the runtime representation of an F# lambda?
The code
1:
2:
3:
|
let mkFunc () =
let rand = System.Random().Next()
fun x -> rand + x
|
Compiles to
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
|
[Serializable]
internal class mkFunc@5 : FSharpFunc<int, int>
{
public int rand;
internal mkFunc@5(int rand)
{
this.rand = rand;
}
public override int Invoke(int x)
{
return this.rand + x;
}
}
|
Remarks
- F# lambdas are instances of compiler generated classes
- that inherit the abstract class
FSharpFunc<'T,'U>
- Lambda body found in the
Invoke
method override
- Captured variables represented as instance fields
What does a serialized lambda look like?
1:
2:
3:
|
let mkFunc () =
let rand = System.Random().Next()
fun x -> rand + x
|
1:
|
FsPickler.PickleToJson<int -> int>(mkFunc())
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
|
{
"subtype": {
"Case": "NamedType",
"Name": "FSI_0022+mkLambda@32-1",
"Assembly": {
"Name": "FSI-ASSEMBLY",
"Version": "0.0.0.0",
"Culture": "neutral",
"PublicKeyToken": ""
}
},
"instance": {
"rand": 824280236
}
}
|
Continuations can be extremely complex objects
The Code
1:
2:
3:
4:
5:
|
let pickleCC () : Cloud<string> =
fun ctx cont ->
let serializer = FsPickler.CreateJsonSerializer(indent = true)
let pickle = serializer.PickleToString cont
cont ctx pickle
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
|
let rec dive n = cloud {
if n = 0 then
let! c = pickleCC ()
return c
else
let! c = dive (n-1)
return c
}
cluster.Run(dive 5)
|
MBrace uses FsPickler for serialization
FsPickler
- Messaging Serializer
- Multi-format
- Support most "serializable" .NET object graphs
- Fast
- Pickler combinators
A .NET process cannot deserialize a lambda unless
it already knows about its corresponding class definition.
This holds trivially in "closed" distributed systems,
where every node contains a fixed set of assemblies.
Not so in "dynamic" systems, where code can
be generated at runtime or introduced after deployment.
Vagabond
- Runtime assembly dependency management library
- Lets .NET applications execute arbitrary code
- Incremental accumulation of client-defined code and data
- Built-in support for F# and C# REPLs
In essense, Vagabond is two methods:
1:
2:
|
val computeDependencies : graph:obj -> AssemblyPackage []
val loadDependency : AssemblyPackage -> System.Reflection.Assembly
|
Implementing Azure Scheduler
- FsPickler/Vagabond for serialization
- Use Blob Storage for storing new assemblies
- Use Service Bus for queueing jobs
Implementing Azure Scheduler
1:
2:
3:
4:
5:
6:
7:
|
type AzureScheduler() =
interface IScheduler with
member __.Enqueue(job:IContext -> unit) =
let dependencies = Vagabond.computeDependencies(job)
Blob.PersistAssemblies(dependencies)
let json = FsPickler.PickleToJson(job)
ServiceBus.EnqueueJob(json)
|
MBrace.Runtime
- Runtime foundation based on FsPickler/Vagabond.
- Common semantics/architecture for distributed runtimes.
- Easy runtime authoring.
Benefiting from FsPickler/Vagabond
- Akka.Net, Prajna, Mobius
- Your project!
A dive into Cloud<'T>