namespace System
namespace System.Threading
namespace System.Threading.Tasks
namespace System.Net
namespace MBrace
namespace MBrace.Core
module BuilderAsyncExtensions
from MBrace.Core
namespace MBrace.Thespian
namespace MBrace.Flow
namespace Microsoft
namespace Microsoft.FSharp
namespace Microsoft.FSharp.Control
namespace XPlot
module GoogleCharts
from XPlot
val workA : obj
Full name: Index.workA
val workB : obj
Full name: Index.workB
val dataSizes : int list
Full name: Index.Wordcount.dataSizes
val clusterSizes : int list
Full name: Index.Wordcount.clusterSizes
val downloadTimes : float list
Full name: Index.Wordcount.downloadTimes
val wordCountTimes : float list
Full name: Index.Wordcount.wordCountTimes
val labels : string list
Full name: Index.Wordcount.labels
Multiple items
module List
from Microsoft.FSharp.Collections
--------------------
type List<'T> =
| ( [] )
| ( :: ) of Head: 'T * Tail: 'T list
interface IEnumerable
interface IEnumerable<'T>
member GetSlice : startIndex:int option * endIndex:int option -> 'T list
member Head : 'T
member IsEmpty : bool
member Item : index:int -> 'T with get
member Length : int
member Tail : 'T list
static member Cons : head:'T * tail:'T list -> 'T list
static member Empty : 'T list
Full name: Microsoft.FSharp.Collections.List<_>
val zip : list1:'T1 list -> list2:'T2 list -> ('T1 * 'T2) list
Full name: Microsoft.FSharp.Collections.List.zip
val map : mapping:('T -> 'U) -> list:'T list -> 'U list
Full name: Microsoft.FSharp.Collections.List.map
val c : int
val d : int
val sprintf : format:Printf.StringFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
val downloadData : (string * float) list
Full name: Index.Wordcount.downloadData
val wordCountData : (string * float) list
Full name: Index.Wordcount.wordCountData
val chart : GoogleChart
Full name: Index.Wordcount.chart
type Chart =
static member Annotation : data:seq<#seq<DateTime * 'V * string * string>> * ?Labels:seq<string> * ?Options:Options -> GoogleChart (requires 'V :> value)
static member Annotation : data:seq<DateTime * #value * string * string> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
static member Area : data:seq<#seq<'K * 'V>> * ?Labels:seq<string> * ?Options:Options -> GoogleChart (requires 'K :> key and 'V :> value)
static member Area : data:seq<#key * #value> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
static member Bar : data:seq<#seq<'K * 'V>> * ?Labels:seq<string> * ?Options:Options -> GoogleChart (requires 'K :> key and 'V :> value)
static member Bar : data:seq<#key * #value> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
static member Bubble : data:seq<string * #value * #value * #value * #value> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
static member Bubble : data:seq<string * #value * #value * #value> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
static member Bubble : data:seq<string * #value * #value> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
static member Calendar : data:seq<DateTime * #value> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
...
Full name: XPlot.GoogleCharts.Chart
static member Chart.Column : data:seq<#seq<'K * 'V>> * ?Labels:seq<string> * ?Options:Options -> GoogleChart (requires 'K :> key and 'V :> value)
static member Chart.Column : data:seq<#key * #value> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
static member Chart.WithXTitle : xTitle:string -> chart:GoogleChart -> GoogleChart
static member Chart.WithYTitle : yTitle:string -> chart:GoogleChart -> GoogleChart
static member Chart.WithSize : size:(int * int) -> chart:GoogleChart -> GoogleChart
static member Chart.Show : chart:GoogleChart -> GoogleChart
val clusterSizes : int list
Full name: Index.KNN.clusterSizes
val times : float list
Full name: Index.KNN.times
val localLabel : string
Full name: Index.KNN.localLabel
val localTime : float
Full name: Index.KNN.localTime
val data : (string * float) list
Full name: Index.KNN.data
val sz : int
val t : float
val chart : GoogleChart
Full name: Index.KNN.chart
property GoogleChart.Html: string
val clusterSizes : int list
Full name: Index.Propan.clusterSizes
val times : float list
Full name: Index.Propan.times
val localLabel : string
Full name: Index.Propan.localLabel
val localTime : float
Full name: Index.Propan.localTime
val data : (string * float) list
Full name: Index.Propan.data
val chart : GoogleChart
Full name: Index.Propan.chart
val locData : (string * float) list
Full name: Index.Code.locData
val chart : GoogleChart
Full name: Index.Code.chart
static member Chart.Bar : data:seq<#seq<'K * 'V>> * ?Labels:seq<string> * ?Options:Options -> GoogleChart (requires 'K :> key and 'V :> value)
static member Chart.Bar : data:seq<#key * #value> * ?Labels:seq<string> * ?Options:Options -> GoogleChart
val getLineCountAsync : uris:Uri list -> Async<int>
Full name: Index.getLineCountAsync
val uris : Uri list
Multiple items
type Uri =
new : uriString:string -> Uri + 5 overloads
member AbsolutePath : string
member AbsoluteUri : string
member Authority : string
member DnsSafeHost : string
member Equals : comparand:obj -> bool
member Fragment : string
member GetComponents : components:UriComponents * format:UriFormat -> string
member GetHashCode : unit -> int
member GetLeftPart : part:UriPartial -> string
...
Full name: System.Uri
--------------------
Uri(uriString: string) : unit
Uri(uriString: string, uriKind: UriKind) : unit
Uri(baseUri: Uri, relativeUri: string) : unit
Uri(baseUri: Uri, relativeUri: Uri) : unit
type 'T list = List<'T>
Full name: Microsoft.FSharp.Collections.list<_>
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val get : (Uri -> Async<int>)
val uri : Uri
val wc : WebClient
Multiple items
type WebClient =
inherit Component
new : unit -> WebClient
member BaseAddress : string with get, set
member CachePolicy : RequestCachePolicy with get, set
member CancelAsync : unit -> unit
member Credentials : ICredentials with get, set
member DownloadData : address:string -> byte[] + 1 overload
member DownloadDataAsync : address:Uri -> unit + 1 overload
member DownloadFile : address:string * fileName:string -> unit + 1 overload
member DownloadFileAsync : address:Uri * fileName:string -> unit + 1 overload
member DownloadString : address:string -> string + 1 overload
...
Full name: System.Net.WebClient
--------------------
WebClient() : unit
val lines : string
member WebClient.AsyncDownloadString : address:Uri -> Async<string>
property String.Length: int
val sizes : int []
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
static member Async.Parallel : computations:seq<Async<'T>> -> Async<'T []>
val u : Uri
type Array =
member Clone : unit -> obj
member CopyTo : array:Array * index:int -> unit + 1 overload
member GetEnumerator : unit -> IEnumerator
member GetLength : dimension:int -> int
member GetLongLength : dimension:int -> int64
member GetLowerBound : dimension:int -> int
member GetUpperBound : dimension:int -> int
member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
member Initialize : unit -> unit
member IsFixedSize : bool
...
Full name: System.Array
val sum : array:'T [] -> 'T (requires member ( + ) and member get_Zero)
Full name: Microsoft.FSharp.Collections.Array.sum
val getLineCount : cloudfiles:string list -> 'a
Full name: Index.getLineCount
val cloudfiles : string list
Multiple items
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = String
Full name: Microsoft.FSharp.Core.string
type Parallel =
static member For : fromInclusive:int * toExclusive:int * body:Action<int> -> ParallelLoopResult + 11 overloads
static member ForEach<'TSource> : source:IEnumerable<'TSource> * body:Action<'TSource> -> ParallelLoopResult + 19 overloads
static member Invoke : [<ParamArray>] actions:Action[] -> unit + 1 overload
Full name: System.Threading.Tasks.Parallel
val loop : counter:int ref -> inbox:MailboxProcessor<int> -> Async<'a>
Full name: Index.loop
val counter : int ref
Multiple items
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
Multiple items
val ref : value:'T -> 'T ref
Full name: Microsoft.FSharp.Core.Operators.ref
--------------------
type 'T ref = Ref<'T>
Full name: Microsoft.FSharp.Core.ref<_>
val inbox : MailboxProcessor<int>
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val msg : int
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val counter : int ref
Full name: Index.counter
val actor : MailboxProcessor<int>
Full name: Index.actor
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val i : int32
member MailboxProcessor.Post : message:'Msg -> unit
property Ref.Value: int
val parMap : f:('T -> 'S) -> inputs:'T list -> 'a
Full name: Index.parMap
val f : ('T -> 'S)
val inputs : 'T list
property List.Length: int
val append : list1:'T list -> list2:'T list -> 'T list
Full name: Microsoft.FSharp.Collections.List.append
Multiple items
module CloudFlow
from MBrace.Flow
--------------------
type CloudFlow =
static member OfArray : source:'T [] -> CloudFlow<'T>
static member OfCloudArrays : cloudArrays:seq<#CloudArray<'T>> -> LocalCloud<PersistedCloudFlow<'T>>
static member OfCloudCollection : collection:ICloudCollection<'T> * ?sizeThresholdPerWorker:(unit -> int64) -> CloudFlow<'T>
static member OfCloudDirectory : dirPath:string * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
static member OfCloudDirectory : dirPath:string * ?deserializer:(Stream -> seq<'T>) * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
static member OfCloudDirectory : dirPath:string * deserializer:(TextReader -> seq<'T>) * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
static member OfCloudDirectoryByLine : dirPath:string * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
static member OfCloudFileByLine : path:string * ?encoding:Encoding -> CloudFlow<string>
static member OfCloudFileByLine : paths:seq<string> * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
static member OfCloudFiles : paths:seq<string> * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
...
Full name: MBrace.Flow.CloudFlow
--------------------
type CloudFlow<'T> =
interface
abstract member WithEvaluators : collectorFactory:LocalCloud<Collector<'T,'S>> -> projection:('S -> LocalCloud<'R>) -> combiner:('R [] -> LocalCloud<'R>) -> Cloud<'R>
abstract member DegreeOfParallelism : int option
end
Full name: MBrace.Flow.CloudFlow<_>
static member CloudFlow.OfHttpFileByLine : urls:seq<string> * ?encoding:Text.Encoding -> CloudFlow<string>
static member CloudFlow.OfHttpFileByLine : url:string * ?encoding:Text.Encoding -> CloudFlow<string>
val collect : f:('T -> #seq<'R>) -> flow:CloudFlow<'T> -> CloudFlow<'R>
Full name: MBrace.Flow.CloudFlow.collect
val line : string
String.Split([<ParamArray>] separator: char []) : string []
String.Split(separator: string [], options: StringSplitOptions) : string []
String.Split(separator: char [], options: StringSplitOptions) : string []
String.Split(separator: char [], count: int) : string []
String.Split(separator: string [], count: int, options: StringSplitOptions) : string []
String.Split(separator: char [], count: int, options: StringSplitOptions) : string []
val map : f:('T -> 'R) -> flow:CloudFlow<'T> -> CloudFlow<'R>
Full name: MBrace.Flow.CloudFlow.map
val word : string
String.Trim() : string
String.Trim([<ParamArray>] trimChars: char []) : string
val filter : predicate:('T -> bool) -> flow:CloudFlow<'T> -> CloudFlow<'T>
Full name: MBrace.Flow.CloudFlow.filter
val countBy : projection:('T -> 'Key) -> flow:CloudFlow<'T> -> CloudFlow<'Key * int64> (requires equality)
Full name: MBrace.Flow.CloudFlow.countBy
val id : x:'T -> 'T
Full name: Microsoft.FSharp.Core.Operators.id
val sortByDescending : projection:('T -> 'Key) -> takeCount:int -> flow:CloudFlow<'T> -> CloudFlow<'T> (requires comparison)
Full name: MBrace.Flow.CloudFlow.sortByDescending
val count : int64
val toArray : flow:CloudFlow<'T> -> MBrace.Core.Cloud<'T []>
Full name: MBrace.Flow.CloudFlow.toArray
MBrace
REPL driven scalable computation
Eirik Tsarpalis
About Nessos
- ISV based in Athens, Greece
- Azure, F# and .NET experts
- Open source projects
F#'s most significant intellectual export
async { ... }
Now introducing
cloud { ... }
Agenda
- Programming Model
- Demo Time
- Case Studies
- Project Overview
From Async ...
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
|
let getLineCountAsync (uris : Uri list) = async {
let get (uri : Uri) = async {
use wc = new WebClient()
let! lines = wc.AsyncDownloadString uri
return lines.Length
}
let! sizes = Async.Parallel [for u in uris -> get u]
return Array.sum sizes
}
|
to Cloud
1:
2:
3:
4:
5:
6:
7:
8:
9:
|
let getLineCount (cloudfiles : string list) = cloud {
let get (cloudfile : string) = cloud {
let! lines = CloudFile.ReadAllLines cloudfile
return lines.Length
}
let! sizes = Cloud.Parallel [for f in cloudfiles -> get f]
return Array.sum sizes
}
|
From Async ...
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
|
let rec loop (counter : int ref) (inbox : MailboxProcessor<int>) = async {
let! msg = inbox.Receive()
counter := !counter + msg
return! loop counter inbox
}
let counter = ref 0
let actor = MailboxProcessor.Start (loop counter)
for i in 1 .. 100 do actor.Post i
!counter
|
to Cloud
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
|
cloud {
let rec loop (counter : CloudAtom<int>) (queue : CloudQueue<int>) = cloud {
let! msg = queue.DequeueAsync()
do! counter.UpdateAsync(fun c -> c + msg)
return! loop counter queue
}
let! counter = CloudAtom.New<int>(initial = 0)
let! queue = CloudQueue.New<int> ()
let! _ = Cloud.CreateProcess(loop counter queue)
for i in 1 .. 100 do queue.Enqueue i
return counter.Value
}
|
MBrace.Core
- Async on Steroids
- Declarative, higher-order, composable
- Vendor-agnostic distributed computation
- Integrated storage and messaging abstractions
A TPL for the Cloud
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
|
cloud {
let! cts = Cloud.CreateCancellationTokenSource()
try
let! procA = Cloud.CreateProcess(workA, cancellationToken = cts.Token)
let! procB = Cloud.CreateProcess(workB, cancellationToken = cts.Token)
return! Cloud.WhenAny(procA, procB)
finally
cts.Cancel()
}
|
User-level scheduling
1:
2:
3:
4:
5:
6:
7:
8:
9:
|
let parMap (f : 'T -> 'S) (inputs : 'T list) = cloud {
let! [|beefyWorkstation ; weakLaptop|] = Cloud.GetAvailableWorkers()
let C = inputs.Length * 2 / 3
let seqMap ts = cloud { return List.map f ts }
let! procA = Cloud.CreateProcess(seqMap inputs.[..C-1], target = beefyWorkstation)
let! procB = Cloud.CreateProcess(seqMap inputs.[C..], target = weakLaptop)
do! Cloud.WhenAll(procA, procB)
return List.append procA.Result procB.Result
}
|
MBrace.Core recap
- TPL for the cloud
- User-level scheduling
- Building block for cloud libraries
MBrace.Flow
- Streaming library for MBrace
- Built entirely on top of MBrace.Core
- Runtime agnostic
- Similar to Spark Streaming
- Based on Java 8 Streams
Flow example
1:
2:
3:
4:
5:
6:
7:
8:
|
[ "http://dumps.wikimedia.org/other/static_html_dumps/current/en/html.lst" ]
|> CloudFlow.OfHttpFileByLine
|> CloudFlow.collect (fun line -> line.Split(' '))
|> CloudFlow.map (fun word -> word.Trim())
|> CloudFlow.filter (fun word -> word.Length > 3)
|> CloudFlow.countBy id
|> CloudFlow.sortByDescending (fun (_,count) -> count) 100
|> CloudFlow.toArray
|
MBrace Runtimes
- MBrace.ThreadPool
- scheduled to .NET thread pool
- In-memory storage and communication
- MBrace.Thespian
- On-premises standalone workers
- Shared file system storage
- Communication using Thespian actors
- MBrace.Azure
- Worker roles
- Azure Blob/Table storage
- Communication using Service Bus
Profile
- Data Source: http://www.textfiles.com
- MBrace.Azure cluster using A3 (Quad core) workers
- Tested clusters of 4, 8, 16, 32 and 44 workers
- Input size scaled proportionally to cluster size
Profile
- Data Source: Kaggle Digit Recognizer
- Use @brandewinder's KNN based implementation
- Local Quad Core i7 CPU @ 3.5GHz
- MBrace.Azure cluster using A3 (Quad core) workers
- Tested clusters of 4, 8, 16 and 32 workers
- Input size constant across cluster sizes
The Problem
- Core simulations performed using legacy code
- Fortran based, single threaded, command line
- Simulation code difficult to modify
- One file per execution
- Millions of files per simulation
The Solution
- Scale out simulation using MBrace
- Deploy legacy code across an Azure cluster
- Store input and output files in blob storage
- 200 lines of F# code
Performance
- Tested with 1000 sample input files
- Each file ~10K, taking ~20sec to process
- MBrace.Azure cluster using A3 (Quad core) workers
- Tested clusters of 4, 8, 16 and 32 workers
- Input size constant across cluster sizes
MBrace
- Open sourced one year ago
- Has transformed to a different project
- No longer exclusively maintained by Nessos
- Final steps towards 1.0 milestone
- Currently in beta
The MBrace Stack

How can I contribute?
- MBrace libraries
- MBrace runtimes
- MBrace for C#
- Documentation