SlideShare a Scribd company logo
1 of 58
Download to read offline
Concurrent Stream Processing

Alex Miller - @puredanger
Revelytix - http://revelytix.com
Contents
• Query execution - the problem
• Plan representation - plans in our program
• Processing components - building blocks
• Processing execution - executing plans




                                               2
Query Execution
Relational Data & Queries


 SELECT NAME      NAME      AGE
 FROM PERSON
                  Joe       30
 WHERE AGE > 20




                                  4
RDF
"Resource Description Framework" - a fine-
grained graph representation of data
                                             o/ a   ge   30
                              http : / / d em

          http://data/Joe
                            http://
                                    d   emo/na           "Joe"
                                                 me




Subject              Predicate                                Object

 http://data/Joe      http://demo/age                            30

 http://data/Joe      http://demo/name                           "Joe"
                                                                         5
SPARQL queries
SPARQL is a query language for RDF

 PREFIX demo: <http://demo/>
 SELECT ?name
                             A "triple pattern"
 WHERE {
   ?person demo:age ?age.           Natural join
                                    on ?person
   ?person demo:name ?name.
   FILTER (?age > 20)
 }

                                                   6
Relational-to-RDF
• W3C R2RML mappings define how to virtually
  map a relational db into RDF
  PREFIX demo: <http://demo/>
  SELECT ?name                                       SELECT NAME
  WHERE {                                            FROM PERSON
      ?person demo:age ?age.                         WHERE AGE > 20
      ?person demo:name ?name.
      FILTER (?age > 20)
  }
                                                     NAME AGE
                              emo/ag
                                    e
                                             30      Joe    30
                     http://d
 http://data/Joe
                   http://
                           d   emo/na        "Joe"
                                        me
                                                                      7
Enterprise federation
• Model domain at enterprise level
• Map into data sources
• Federate across the enterprise (and beyond)
                        SPARQL

                      Enterprise
             SPARQL                SPARQL
                       SPARQL



              SQL        SQL         SQL




                                                8
Query pipeline
• How does a query engine work?


     SQL



             ST            an                an                an
            A            Pl                Pl                Pl
    Parse         Plan          Resolve           Optimize          Process




                                Metadata                            Results!




                                                                               9
Trees!


   SQL
                            Trees!

            ST              n                  n                  n
           A            Pla                Pla                Pla
   Parse         Plan           Resolve            Optimize           Process




                                Metadata                              Results!




                                                                                 10
Plan
Representation
SQL query plans
                      SELECT Name, DeptName
                      FROM Person, Dept
                      WHERE Person.DeptID = Dept.DeptID
                       AND Age > 20

  Person
  Name
  Age
  DeptID     join             filter        project
             DeptID          Age > 20    Name, DeptName
   Dept
  DeptID
  DeptName



                                                          12
SPARQL query plans
                            SELECT ?Name
                            WHERE {
                              ?Person :Name ?Name .
                              ?Person :Age ?Age .
                              FILTER (?Age > 20)
                            }
   TP1
 { ?Person
   :Name
   ?Name }   join       filter        project
             ?Person   ?Age > 20       ?Name
   TP2
 { ?Person
   :Age
   ?Age }

                                                      13
Common model
Streams of tuples flowing through a network of
processing nodes



  node

             node       node        node

  node



                                                 14
What kind of nodes?
• Tuple generators (leaves)
 – In SQL: a table or view
 – In SPARQL: a triple pattern
• Combinations (multiple children)
 – Join
 – Union
• Transformations
 –   Filter                      – Project
 –   Dup removal                 – Slice (limit / offset)
 –   Sort                        – etc
 –   Grouping
                                                            15
Representation
Tree data structure with nodes and attributes
                      PlanNode
                    childNodes


                                     TableNode
                                 Table

                                      JoinNode
                                 joinType
        Java                     joinCriteria

                                      FilterNode
                                 criteria

                                    ProjectNode
                                 projectExpressions

                                     SliceNode
                                 limit
                                 offset

                                                      16
s-expressions
Tree data structure with nodes and attributes



            (*
                 (+ 2 3)
                 (- 6 5) )




                                                17
List representation
Tree data structure with nodes and attributes


 (project+ [Name DeptName]
   (filter+ (> Age 20)
     (join+
       (table+ Empl [Name Age DeptID])
       (table+ Dept [DeptID DeptName]))))




                                                18
Query optimization
Example - pushing criteria down

 (project+ [Name DeptName]
   (filter+ (> Age 20)
     (join+
       (project+ [Name Age DeptID]
         (bind+ [Age (- (now) Birth)]
              (table+ Empl [Name Birth DeptID])))
       (table+ Dept [DeptID DeptName]))))


                                                19
Query optimization
Example - rewritten

 (project+ [Name DeptName]
   (join+
     (project+ [Name DeptID]
       (filter+ (> (- (now) Birth) 20)
            (table+ Empl [Name Birth DeptID])))
     (table+ Dept [DeptID DeptName])))




                                                  20
Hash join conversion

 left tree
             join+
right tree




 left tree   preduce+       first+
                                        hashes
             hash-tuples
                                             let+
                                            mapcat
                                            tuple-matches
                           right tree


                                                            21
Hash join conversion
 (join+ _left _right)




 (let+
   [hashes (first+
             (preduce+ (hash-tuple join-vars {}
                           #(merge-with concat %1 %2))
                         _left))]
   (mapcat (fn [tuple]
               (tuple-matches hashes join-vars tuple))
           _right)))

                                                         22
Processing trees
 • Compile abstract nodes into more concrete
   stream operations:
 – map+          –   pmap+      – number+
 – mapcat+       –   pmapcat+   – reorder+
 – filter+       –   pfilter+   – rechunk+
                 –   preduce+
 – first+                       – pmap-chunk+
 – mux+                         – preduce-chunk+

 – let+
 – let-stream+
                                                   23
Summary
• SPARQL and SQL query plans have essentially
  the same underlying algebra
• Model is a tree of nodes where tuples flow from
  leaves to the root
• A natural representation of this tree in Clojure is
  as a tree of s-expressions, just like our code
• We can manipulate this tree to provide
  – Optimizations
  – Differing levels of abstraction


                                                        24
Processing
Components
Pipes
Pipes are streams of data



Producer                                   Consumer

                           Pipe

(enqueue pipe item)               (dequeue pipe item)
(enqueue-all pipe items)          (dequeue-all pipe items)
(close pipe)                      (closed? pipe)
(error pipe exception)            (error? pipe)



                                                         26
Pipe callbacks
Events on the pipe trigger callbacks which are
executed on the caller's thread




                                                 27
Pipe callbacks
Events on the pipe trigger callbacks which are
executed on the caller's thread

                  callback-fn




1. (add-callback pipe callback-fn)




                                                 27
Pipe callbacks
Events on the pipe trigger callbacks which are
executed on the caller's thread




                                     callback-fn



1. (add-callback pipe callback-fn)




                                                   27
Pipe callbacks
Events on the pipe trigger callbacks which are
executed on the caller's thread




                                     callback-fn



1. (add-callback pipe callback-fn)
2. (enqueue pipe "foo")




                                                   27
Pipe callbacks
Events on the pipe trigger callbacks which are
executed on the caller's thread




                                     callback-fn



1. (add-callback pipe callback-fn)
2. (enqueue pipe "foo")




                                                   27
Pipe callbacks
Events on the pipe trigger callbacks which are
executed on the caller's thread




                                 callback-fn



1. (add-callback pipe callback-fn)
2. (enqueue pipe "foo")
3. (callback-fn "foo") ;; during enqueue



                                                 27
Pipes
Pipes are thread-safe functional data structures




                                                   28
Pipes
Pipes are thread-safe functional data structures




                      callback-fn




                                                   28
Batched tuples
• To a pipe, data is just data. We actually pass
  data in batches through the pipe for efficiency.


                         [
                             {:Name   "Alex" :Eyes "Blue" }
                             {:Name   "Jeff" :Eyes "Brown"}
                             {:Name   "Eric" :Eyes "Hazel" }
                             {:Name   "Joe" :Eyes "Blue"}
                             {:Name   "Lisa" :Eyes "Blue" }
                             {:Name   "Glen" :Eyes "Brown"}
                         ]




                                                               29
Pipe multiplexer
Compose multiple pipes into one




                                  30
Pipe tee
Send output to multiple destinations




                                       31
Nodes
• Nodes transform tuples from the input pipe and
  puts results on output pipe.



                       fn
      Input Pipe    Node         Output Pipe
                    •input-pipe
                    •output-pipe
                    •task-fn
                    •state
                    •concurrency
                                                   32
Processing Trees
• Tree of nodes and pipes



        fn
                   fn
        fn                         fn   fn

                   fn




                        Data flow

                                             33
SPARQL query example
                       SELECT ?Name
                       WHERE {
                         ?Person :Name ?Name .
                         ?Person :Age ?Age .
                         FILTER (?Age > 20)
                       }
   TP1
 { ?Person
   :Name
   ?Name }   join             filter         project
             ?Person        ?Age > 20         ?Name
   TP2
                    (project+ [?Name]
 { ?Person           (filter+ (> ?Age 20)
   :Age                (join+ [?Person]
   ?Age }
                         (triple+ [?Person :Name ?Name])
                         (triple+ [?Person :Age ?Age]))))
                                                            34
Processing tree
        { ?Person
          :Name
 TP1      ?Name }




       preduce+           hash-tuples




                      first+
                                 hashes

                                    let+         filter       project
                                 mapcat          ?Age > 20    ?Name
                                 tuple-matches
               TP2
              { ?Person
                :Age
                ?Age }
                                                                       35
riple pattern


   Mapping to nodes
   • An obvious mapping to nodes and pipes
                triple pattern
        fn


                          preduce+
                   fn


                                      first+
                                 fn
                                              fn      fn        fn
                                              let+   filter+   project+
                                 fn

                         triple pattern


                                                                         36
riple pattern


   Mapping to nodes
   • Choosing between compilation and evaluation
        fn


                     preduce+
                                               filter              project
                fn
                                               ?Age > 20           ?Name

                                first+
                          fn
                                        fn                 fn
                                        let+               eval
                          fn

                     triple pattern


                                                                            37
Compile vs eval
• We can evaluate our expressions
 – Directly on streams of Clojure data using Clojure
 – Indirectly via pipes and nodes (more on that next)
• Final step before processing makes decision
 –   Plan nodes that combine data are real nodes
 –   Plan nodes that allow parallelism (p*) are real nodes
 –   Most other plan nodes can be merged into single eval
 –   Many leaf nodes actually rolled up, sent to a database
 –   Lots more work to do on where these splits occur



                                                          38
Processing
 Execution
Execution requirements
• Parallelism
 – Across plans
 – Across nodes in a plan
 – Within a parallelizable node in a plan
• Memory management
 – Allow arbitrary intermediate results sets w/o OOME
• Ops
 – Cancellation
 – Timeouts
 – Monitoring

                                                        40
Event-driven processing
• Dedicated I/O thread pools stream data into plan

          fn
                     fn
          fn                     fn       fn

                     fn
I/O
threads                     Compute threads




                                                 41
Task creation
1.Callback fires when data added to input pipe
2.Callback takes the fn associated with the node
 and bundles it into a task
3.Task is scheduled with the compute thread pool




                       fn
          callback
                      Node
                                                   42
Fork/join vs Executors
• Fork/join thread pool vs classic Executors
 –   Optimized for finer-grained tasks
 –   Optimized for larger numbers of tasks
 –   Optimized for more cores
 –   Works well on tasks with dependencies
 –   No contention on a single queue
 –   Work stealing for load balancing
                   Compute threads




                                               43
Task execution
1.Pull next chunk from input pipe
2.Execute task function with access to node's state
3.Optionally, output one or more chunks to output
 pipe - this triggers the upstream callback
4.If data still available, schedule a new task,
 simulating a new callback on the current node



                       fn
         callback
                                                 44
Concurrency
• Delicate balance between Clojure refs and STM
  and Java concurrency primitives
• Clojure refs - managed by STM
 – Input pipe
 – Output pipe
 – Node state
• Java concurrency
 – Semaphore - "permits" to limit tasks per node
 – Per-node scheduling lock
• Key integration constraint
 – Clojure transactions can fail and retry!
                                                   45
Concurrency mechanisms

          Acquire             Dequeue            Input                  Create    invoke              Result
                                                                                                                 empty
        sempahore   Yes        input            message      Data        task      task              message



                                        empty                                              Close           Data
                                                 Close
                                                                                                         enqueue
                                                                                    Input                 data on
                                                set closed
                                                                                   closed?              output pipe
                                                  = true
  process-input
                                                                                               Yes                    Yes
                                                                                  set closed              Closes
                                                                                    = true                output?




  close-output                                  closed &&
                                              !closed_done
                                                               No

                                                      Yes                        run-task
    close           set     run-task
                                        Yes
                                               acquire all
   output-      closed_do     w/ nil
                                              semaphores
    pipe        ne = true     msg                            No
   release
                                                                                 Blue outline = Java lock
                                                                                 all = under Java semaphore
                                                                 release 1
      all
                                                                semaphore
  semaphor
      es                                                                         Green outline = Cloj txn
                                                                                 Blue shading = Cloj atom
Memory management
• Pipes are all on the heap
• How do we avoid OutOfMemory?




                                 47
Buffered pipes
• When heap space is low, store pipe data on disk
• Data is serialized / deserialized to/from disk
• Memory-mapped files are used to improve I/O

             fn
                        fn             fn

             fn




                             0100 ….


                                                48
Memory monitoring
• JMX memory beans
 – To detect when memory is tight -> writing to disk
   • Use memory pool threshold notifications
 – To detect when memory is ok -> write to memory
   • Use polling (no notification on decrease)
• Composite pipes
 – Build a logical pipe out of many segments
 – As memory conditions go up and down, each segment
  is written to the fastest place. We never move data.



                                                       49
Cancellation
• Pool keeps track of what nodes belong to which
  plan
• All nodes check for cancellation during execution
• Cancellation can be caused by:
 – Error during execution
 – User intervention from admin UI
 – Timeout from query settings




                                                  50
Summary
• Data flow architecture
 – Event-driven by arrival of data
 – Compute threads never block
 – Fork/join to handle scheduling of work
• Clojure as abstraction tool
 – Expression tree lets us express plans concisely
 – Also lets us manipulate them with tools in Clojure
 – Lines of code
   • Fork/join pool, nodes, pipes - 1200
   • Buffer, serialization, memory monitor - 970
   • Processor, compiler, eval - 1900
• Open source? Hmmmmmmmmmmm…….                          51
Thanks...
      Alex Miller
   @puredanger
   Revelytix, Inc.

More Related Content

Viewers also liked

Stream Execution with Clojure and Fork/join
Stream Execution with Clojure and Fork/joinStream Execution with Clojure and Fork/join
Stream Execution with Clojure and Fork/joinAlex Miller
 
Groovy concurrency
Groovy concurrencyGroovy concurrency
Groovy concurrencyAlex Miller
 
Project Fortress
Project FortressProject Fortress
Project FortressAlex Miller
 
Scaling Hibernate with Terracotta
Scaling Hibernate with TerracottaScaling Hibernate with Terracotta
Scaling Hibernate with TerracottaAlex Miller
 
Java Concurrency Gotchas
Java Concurrency GotchasJava Concurrency Gotchas
Java Concurrency GotchasAlex Miller
 
Scaling Your Cache And Caching At Scale
Scaling Your Cache And Caching At ScaleScaling Your Cache And Caching At Scale
Scaling Your Cache And Caching At ScaleAlex Miller
 
Collections In Java
Collections In JavaCollections In Java
Collections In JavaBinoj T E
 
Collection Framework in java
Collection Framework in javaCollection Framework in java
Collection Framework in javaCPD INDIA
 
Cracking clojure
Cracking clojureCracking clojure
Cracking clojureAlex Miller
 
Marshmallow Test
Marshmallow TestMarshmallow Test
Marshmallow TestAlex Miller
 
Java Collection framework
Java Collection frameworkJava Collection framework
Java Collection frameworkankitgarg_er
 
Java Collections
Java CollectionsJava Collections
Java Collectionsparag
 
Clojure: The Art of Abstraction
Clojure: The Art of AbstractionClojure: The Art of Abstraction
Clojure: The Art of AbstractionAlex Miller
 
Java Collections API
Java Collections APIJava Collections API
Java Collections APIAlex Miller
 
Java - Collections framework
Java - Collections frameworkJava - Collections framework
Java - Collections frameworkRiccardo Cardin
 

Viewers also liked (18)

Cold Hard Cache
Cold Hard CacheCold Hard Cache
Cold Hard Cache
 
Stream Execution with Clojure and Fork/join
Stream Execution with Clojure and Fork/joinStream Execution with Clojure and Fork/join
Stream Execution with Clojure and Fork/join
 
Groovy concurrency
Groovy concurrencyGroovy concurrency
Groovy concurrency
 
Project Fortress
Project FortressProject Fortress
Project Fortress
 
Scaling Hibernate with Terracotta
Scaling Hibernate with TerracottaScaling Hibernate with Terracotta
Scaling Hibernate with Terracotta
 
Java collection
Java collectionJava collection
Java collection
 
Java Concurrency Gotchas
Java Concurrency GotchasJava Concurrency Gotchas
Java Concurrency Gotchas
 
Scaling Your Cache And Caching At Scale
Scaling Your Cache And Caching At ScaleScaling Your Cache And Caching At Scale
Scaling Your Cache And Caching At Scale
 
07 java collection
07 java collection07 java collection
07 java collection
 
Collections In Java
Collections In JavaCollections In Java
Collections In Java
 
Collection Framework in java
Collection Framework in javaCollection Framework in java
Collection Framework in java
 
Cracking clojure
Cracking clojureCracking clojure
Cracking clojure
 
Marshmallow Test
Marshmallow TestMarshmallow Test
Marshmallow Test
 
Java Collection framework
Java Collection frameworkJava Collection framework
Java Collection framework
 
Java Collections
Java CollectionsJava Collections
Java Collections
 
Clojure: The Art of Abstraction
Clojure: The Art of AbstractionClojure: The Art of Abstraction
Clojure: The Art of Abstraction
 
Java Collections API
Java Collections APIJava Collections API
Java Collections API
 
Java - Collections framework
Java - Collections frameworkJava - Collections framework
Java - Collections framework
 

Similar to Concurrent Stream Processing Components

Visualizations using Visualbox
Visualizations using VisualboxVisualizations using Visualbox
Visualizations using VisualboxAlvaro Graves
 
A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets with Jules ...
A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets with Jules ...A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets with Jules ...
A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets with Jules ...Databricks
 
Graph Databases - Where Do We Do the Modeling Part?
Graph Databases - Where Do We Do the Modeling Part?Graph Databases - Where Do We Do the Modeling Part?
Graph Databases - Where Do We Do the Modeling Part?DATAVERSITY
 
A Tale of Three Apache Spark APIs: RDDs, DataFrames and Datasets by Jules Damji
A Tale of Three Apache Spark APIs: RDDs, DataFrames and Datasets by Jules DamjiA Tale of Three Apache Spark APIs: RDDs, DataFrames and Datasets by Jules Damji
A Tale of Three Apache Spark APIs: RDDs, DataFrames and Datasets by Jules DamjiData Con LA
 
Jan Lehnardt Couch Db In A Real World Setting
Jan Lehnardt Couch Db In A Real World SettingJan Lehnardt Couch Db In A Real World Setting
Jan Lehnardt Couch Db In A Real World SettingGeorge Ang
 
Visualize open data with Plone - eea.daviz PLOG 2013
Visualize open data with Plone - eea.daviz PLOG 2013Visualize open data with Plone - eea.daviz PLOG 2013
Visualize open data with Plone - eea.daviz PLOG 2013Antonio De Marinis
 
Jump Start into Apache® Spark™ and Databricks
Jump Start into Apache® Spark™ and DatabricksJump Start into Apache® Spark™ and Databricks
Jump Start into Apache® Spark™ and DatabricksDatabricks
 
Relational Database Fundamentals
Relational Database FundamentalsRelational Database Fundamentals
Relational Database FundamentalsKHALID C
 
GreenDao Introduction
GreenDao IntroductionGreenDao Introduction
GreenDao IntroductionBooch Lin
 
Python business intelligence (PyData 2012 talk)
Python business intelligence (PyData 2012 talk)Python business intelligence (PyData 2012 talk)
Python business intelligence (PyData 2012 talk)Stefan Urbanek
 
Tips And Tricks For Bioinformatics Software Engineering
Tips And Tricks For Bioinformatics Software EngineeringTips And Tricks For Bioinformatics Software Engineering
Tips And Tricks For Bioinformatics Software Engineeringjtdudley
 
Coral & Transport UDFs: Building Blocks of a Postmodern Data Warehouse​
Coral & Transport UDFs: Building Blocks of a Postmodern Data Warehouse​Coral & Transport UDFs: Building Blocks of a Postmodern Data Warehouse​
Coral & Transport UDFs: Building Blocks of a Postmodern Data Warehouse​Walaa Eldin Moustafa
 
Vital AI MetaQL: Queries Across NoSQL, SQL, Sparql, and Spark
Vital AI MetaQL: Queries Across NoSQL, SQL, Sparql, and SparkVital AI MetaQL: Queries Across NoSQL, SQL, Sparql, and Spark
Vital AI MetaQL: Queries Across NoSQL, SQL, Sparql, and SparkVital.AI
 
Structuring Apache Spark 2.0: SQL, DataFrames, Datasets And Streaming - by Mi...
Structuring Apache Spark 2.0: SQL, DataFrames, Datasets And Streaming - by Mi...Structuring Apache Spark 2.0: SQL, DataFrames, Datasets And Streaming - by Mi...
Structuring Apache Spark 2.0: SQL, DataFrames, Datasets And Streaming - by Mi...Databricks
 
Intro to Spark and Spark SQL
Intro to Spark and Spark SQLIntro to Spark and Spark SQL
Intro to Spark and Spark SQLjeykottalam
 
Overview of running R in the Oracle Database
Overview of running R in the Oracle DatabaseOverview of running R in the Oracle Database
Overview of running R in the Oracle DatabaseBrendan Tierney
 
Neo4j Morpheus: Interweaving Table and Graph Data with SQL and Cypher in Apac...
Neo4j Morpheus: Interweaving Table and Graph Data with SQL and Cypher in Apac...Neo4j Morpheus: Interweaving Table and Graph Data with SQL and Cypher in Apac...
Neo4j Morpheus: Interweaving Table and Graph Data with SQL and Cypher in Apac...Databricks
 
Structuring Spark: DataFrames, Datasets, and Streaming
Structuring Spark: DataFrames, Datasets, and StreamingStructuring Spark: DataFrames, Datasets, and Streaming
Structuring Spark: DataFrames, Datasets, and StreamingDatabricks
 
Druid Adoption Tips and Tricks
Druid Adoption Tips and TricksDruid Adoption Tips and Tricks
Druid Adoption Tips and TricksImply
 

Similar to Concurrent Stream Processing Components (20)

Visualizations using Visualbox
Visualizations using VisualboxVisualizations using Visualbox
Visualizations using Visualbox
 
A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets with Jules ...
A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets with Jules ...A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets with Jules ...
A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets with Jules ...
 
Graph Databases - Where Do We Do the Modeling Part?
Graph Databases - Where Do We Do the Modeling Part?Graph Databases - Where Do We Do the Modeling Part?
Graph Databases - Where Do We Do the Modeling Part?
 
A Tale of Three Apache Spark APIs: RDDs, DataFrames and Datasets by Jules Damji
A Tale of Three Apache Spark APIs: RDDs, DataFrames and Datasets by Jules DamjiA Tale of Three Apache Spark APIs: RDDs, DataFrames and Datasets by Jules Damji
A Tale of Three Apache Spark APIs: RDDs, DataFrames and Datasets by Jules Damji
 
Jan Lehnardt Couch Db In A Real World Setting
Jan Lehnardt Couch Db In A Real World SettingJan Lehnardt Couch Db In A Real World Setting
Jan Lehnardt Couch Db In A Real World Setting
 
Visualize open data with Plone - eea.daviz PLOG 2013
Visualize open data with Plone - eea.daviz PLOG 2013Visualize open data with Plone - eea.daviz PLOG 2013
Visualize open data with Plone - eea.daviz PLOG 2013
 
Jump Start into Apache® Spark™ and Databricks
Jump Start into Apache® Spark™ and DatabricksJump Start into Apache® Spark™ and Databricks
Jump Start into Apache® Spark™ and Databricks
 
Algebra relacional
Algebra relacionalAlgebra relacional
Algebra relacional
 
Relational Database Fundamentals
Relational Database FundamentalsRelational Database Fundamentals
Relational Database Fundamentals
 
GreenDao Introduction
GreenDao IntroductionGreenDao Introduction
GreenDao Introduction
 
Python business intelligence (PyData 2012 talk)
Python business intelligence (PyData 2012 talk)Python business intelligence (PyData 2012 talk)
Python business intelligence (PyData 2012 talk)
 
Tips And Tricks For Bioinformatics Software Engineering
Tips And Tricks For Bioinformatics Software EngineeringTips And Tricks For Bioinformatics Software Engineering
Tips And Tricks For Bioinformatics Software Engineering
 
Coral & Transport UDFs: Building Blocks of a Postmodern Data Warehouse​
Coral & Transport UDFs: Building Blocks of a Postmodern Data Warehouse​Coral & Transport UDFs: Building Blocks of a Postmodern Data Warehouse​
Coral & Transport UDFs: Building Blocks of a Postmodern Data Warehouse​
 
Vital AI MetaQL: Queries Across NoSQL, SQL, Sparql, and Spark
Vital AI MetaQL: Queries Across NoSQL, SQL, Sparql, and SparkVital AI MetaQL: Queries Across NoSQL, SQL, Sparql, and Spark
Vital AI MetaQL: Queries Across NoSQL, SQL, Sparql, and Spark
 
Structuring Apache Spark 2.0: SQL, DataFrames, Datasets And Streaming - by Mi...
Structuring Apache Spark 2.0: SQL, DataFrames, Datasets And Streaming - by Mi...Structuring Apache Spark 2.0: SQL, DataFrames, Datasets And Streaming - by Mi...
Structuring Apache Spark 2.0: SQL, DataFrames, Datasets And Streaming - by Mi...
 
Intro to Spark and Spark SQL
Intro to Spark and Spark SQLIntro to Spark and Spark SQL
Intro to Spark and Spark SQL
 
Overview of running R in the Oracle Database
Overview of running R in the Oracle DatabaseOverview of running R in the Oracle Database
Overview of running R in the Oracle Database
 
Neo4j Morpheus: Interweaving Table and Graph Data with SQL and Cypher in Apac...
Neo4j Morpheus: Interweaving Table and Graph Data with SQL and Cypher in Apac...Neo4j Morpheus: Interweaving Table and Graph Data with SQL and Cypher in Apac...
Neo4j Morpheus: Interweaving Table and Graph Data with SQL and Cypher in Apac...
 
Structuring Spark: DataFrames, Datasets, and Streaming
Structuring Spark: DataFrames, Datasets, and StreamingStructuring Spark: DataFrames, Datasets, and Streaming
Structuring Spark: DataFrames, Datasets, and Streaming
 
Druid Adoption Tips and Tricks
Druid Adoption Tips and TricksDruid Adoption Tips and Tricks
Druid Adoption Tips and Tricks
 

More from Alex Miller

Java Concurrency Idioms
Java Concurrency IdiomsJava Concurrency Idioms
Java Concurrency IdiomsAlex Miller
 
Design Patterns Reconsidered
Design Patterns ReconsideredDesign Patterns Reconsidered
Design Patterns ReconsideredAlex Miller
 
Exploring Terracotta
Exploring TerracottaExploring Terracotta
Exploring TerracottaAlex Miller
 
Actor Concurrency
Actor ConcurrencyActor Concurrency
Actor ConcurrencyAlex Miller
 
Java Concurrency Gotchas
Java Concurrency GotchasJava Concurrency Gotchas
Java Concurrency GotchasAlex Miller
 

More from Alex Miller (6)

Java Concurrency Idioms
Java Concurrency IdiomsJava Concurrency Idioms
Java Concurrency Idioms
 
Design Patterns Reconsidered
Design Patterns ReconsideredDesign Patterns Reconsidered
Design Patterns Reconsidered
 
Java 7 Preview
Java 7 PreviewJava 7 Preview
Java 7 Preview
 
Exploring Terracotta
Exploring TerracottaExploring Terracotta
Exploring Terracotta
 
Actor Concurrency
Actor ConcurrencyActor Concurrency
Actor Concurrency
 
Java Concurrency Gotchas
Java Concurrency GotchasJava Concurrency Gotchas
Java Concurrency Gotchas
 

Recently uploaded

The Role of FIDO in a Cyber Secure Netherlands: FIDO Paris Seminar.pptx
The Role of FIDO in a Cyber Secure Netherlands: FIDO Paris Seminar.pptxThe Role of FIDO in a Cyber Secure Netherlands: FIDO Paris Seminar.pptx
The Role of FIDO in a Cyber Secure Netherlands: FIDO Paris Seminar.pptxLoriGlavin3
 
The State of Passkeys with FIDO Alliance.pptx
The State of Passkeys with FIDO Alliance.pptxThe State of Passkeys with FIDO Alliance.pptx
The State of Passkeys with FIDO Alliance.pptxLoriGlavin3
 
Generative Artificial Intelligence: How generative AI works.pdf
Generative Artificial Intelligence: How generative AI works.pdfGenerative Artificial Intelligence: How generative AI works.pdf
Generative Artificial Intelligence: How generative AI works.pdfIngrid Airi González
 
Time Series Foundation Models - current state and future directions
Time Series Foundation Models - current state and future directionsTime Series Foundation Models - current state and future directions
Time Series Foundation Models - current state and future directionsNathaniel Shimoni
 
Arizona Broadband Policy Past, Present, and Future Presentation 3/25/24
Arizona Broadband Policy Past, Present, and Future Presentation 3/25/24Arizona Broadband Policy Past, Present, and Future Presentation 3/25/24
Arizona Broadband Policy Past, Present, and Future Presentation 3/25/24Mark Goldstein
 
Potential of AI (Generative AI) in Business: Learnings and Insights
Potential of AI (Generative AI) in Business: Learnings and InsightsPotential of AI (Generative AI) in Business: Learnings and Insights
Potential of AI (Generative AI) in Business: Learnings and InsightsRavi Sanghani
 
New from BookNet Canada for 2024: Loan Stars - Tech Forum 2024
New from BookNet Canada for 2024: Loan Stars - Tech Forum 2024New from BookNet Canada for 2024: Loan Stars - Tech Forum 2024
New from BookNet Canada for 2024: Loan Stars - Tech Forum 2024BookNet Canada
 
From Family Reminiscence to Scholarly Archive .
From Family Reminiscence to Scholarly Archive .From Family Reminiscence to Scholarly Archive .
From Family Reminiscence to Scholarly Archive .Alan Dix
 
Why device, WIFI, and ISP insights are crucial to supporting remote Microsoft...
Why device, WIFI, and ISP insights are crucial to supporting remote Microsoft...Why device, WIFI, and ISP insights are crucial to supporting remote Microsoft...
Why device, WIFI, and ISP insights are crucial to supporting remote Microsoft...panagenda
 
A Deep Dive on Passkeys: FIDO Paris Seminar.pptx
A Deep Dive on Passkeys: FIDO Paris Seminar.pptxA Deep Dive on Passkeys: FIDO Paris Seminar.pptx
A Deep Dive on Passkeys: FIDO Paris Seminar.pptxLoriGlavin3
 
Enhancing User Experience - Exploring the Latest Features of Tallyman Axis Lo...
Enhancing User Experience - Exploring the Latest Features of Tallyman Axis Lo...Enhancing User Experience - Exploring the Latest Features of Tallyman Axis Lo...
Enhancing User Experience - Exploring the Latest Features of Tallyman Axis Lo...Scott Andery
 
2024 April Patch Tuesday
2024 April Patch Tuesday2024 April Patch Tuesday
2024 April Patch TuesdayIvanti
 
A Framework for Development in the AI Age
A Framework for Development in the AI AgeA Framework for Development in the AI Age
A Framework for Development in the AI AgeCprime
 
Scale your database traffic with Read & Write split using MySQL Router
Scale your database traffic with Read & Write split using MySQL RouterScale your database traffic with Read & Write split using MySQL Router
Scale your database traffic with Read & Write split using MySQL RouterMydbops
 
TeamStation AI System Report LATAM IT Salaries 2024
TeamStation AI System Report LATAM IT Salaries 2024TeamStation AI System Report LATAM IT Salaries 2024
TeamStation AI System Report LATAM IT Salaries 2024Lonnie McRorey
 
Generative AI for Technical Writer or Information Developers
Generative AI for Technical Writer or Information DevelopersGenerative AI for Technical Writer or Information Developers
Generative AI for Technical Writer or Information DevelopersRaghuram Pandurangan
 
The Fit for Passkeys for Employee and Consumer Sign-ins: FIDO Paris Seminar.pptx
The Fit for Passkeys for Employee and Consumer Sign-ins: FIDO Paris Seminar.pptxThe Fit for Passkeys for Employee and Consumer Sign-ins: FIDO Paris Seminar.pptx
The Fit for Passkeys for Employee and Consumer Sign-ins: FIDO Paris Seminar.pptxLoriGlavin3
 
Long journey of Ruby standard library at RubyConf AU 2024
Long journey of Ruby standard library at RubyConf AU 2024Long journey of Ruby standard library at RubyConf AU 2024
Long journey of Ruby standard library at RubyConf AU 2024Hiroshi SHIBATA
 
Digital Identity is Under Attack: FIDO Paris Seminar.pptx
Digital Identity is Under Attack: FIDO Paris Seminar.pptxDigital Identity is Under Attack: FIDO Paris Seminar.pptx
Digital Identity is Under Attack: FIDO Paris Seminar.pptxLoriGlavin3
 
Moving Beyond Passwords: FIDO Paris Seminar.pdf
Moving Beyond Passwords: FIDO Paris Seminar.pdfMoving Beyond Passwords: FIDO Paris Seminar.pdf
Moving Beyond Passwords: FIDO Paris Seminar.pdfLoriGlavin3
 

Recently uploaded (20)

The Role of FIDO in a Cyber Secure Netherlands: FIDO Paris Seminar.pptx
The Role of FIDO in a Cyber Secure Netherlands: FIDO Paris Seminar.pptxThe Role of FIDO in a Cyber Secure Netherlands: FIDO Paris Seminar.pptx
The Role of FIDO in a Cyber Secure Netherlands: FIDO Paris Seminar.pptx
 
The State of Passkeys with FIDO Alliance.pptx
The State of Passkeys with FIDO Alliance.pptxThe State of Passkeys with FIDO Alliance.pptx
The State of Passkeys with FIDO Alliance.pptx
 
Generative Artificial Intelligence: How generative AI works.pdf
Generative Artificial Intelligence: How generative AI works.pdfGenerative Artificial Intelligence: How generative AI works.pdf
Generative Artificial Intelligence: How generative AI works.pdf
 
Time Series Foundation Models - current state and future directions
Time Series Foundation Models - current state and future directionsTime Series Foundation Models - current state and future directions
Time Series Foundation Models - current state and future directions
 
Arizona Broadband Policy Past, Present, and Future Presentation 3/25/24
Arizona Broadband Policy Past, Present, and Future Presentation 3/25/24Arizona Broadband Policy Past, Present, and Future Presentation 3/25/24
Arizona Broadband Policy Past, Present, and Future Presentation 3/25/24
 
Potential of AI (Generative AI) in Business: Learnings and Insights
Potential of AI (Generative AI) in Business: Learnings and InsightsPotential of AI (Generative AI) in Business: Learnings and Insights
Potential of AI (Generative AI) in Business: Learnings and Insights
 
New from BookNet Canada for 2024: Loan Stars - Tech Forum 2024
New from BookNet Canada for 2024: Loan Stars - Tech Forum 2024New from BookNet Canada for 2024: Loan Stars - Tech Forum 2024
New from BookNet Canada for 2024: Loan Stars - Tech Forum 2024
 
From Family Reminiscence to Scholarly Archive .
From Family Reminiscence to Scholarly Archive .From Family Reminiscence to Scholarly Archive .
From Family Reminiscence to Scholarly Archive .
 
Why device, WIFI, and ISP insights are crucial to supporting remote Microsoft...
Why device, WIFI, and ISP insights are crucial to supporting remote Microsoft...Why device, WIFI, and ISP insights are crucial to supporting remote Microsoft...
Why device, WIFI, and ISP insights are crucial to supporting remote Microsoft...
 
A Deep Dive on Passkeys: FIDO Paris Seminar.pptx
A Deep Dive on Passkeys: FIDO Paris Seminar.pptxA Deep Dive on Passkeys: FIDO Paris Seminar.pptx
A Deep Dive on Passkeys: FIDO Paris Seminar.pptx
 
Enhancing User Experience - Exploring the Latest Features of Tallyman Axis Lo...
Enhancing User Experience - Exploring the Latest Features of Tallyman Axis Lo...Enhancing User Experience - Exploring the Latest Features of Tallyman Axis Lo...
Enhancing User Experience - Exploring the Latest Features of Tallyman Axis Lo...
 
2024 April Patch Tuesday
2024 April Patch Tuesday2024 April Patch Tuesday
2024 April Patch Tuesday
 
A Framework for Development in the AI Age
A Framework for Development in the AI AgeA Framework for Development in the AI Age
A Framework for Development in the AI Age
 
Scale your database traffic with Read & Write split using MySQL Router
Scale your database traffic with Read & Write split using MySQL RouterScale your database traffic with Read & Write split using MySQL Router
Scale your database traffic with Read & Write split using MySQL Router
 
TeamStation AI System Report LATAM IT Salaries 2024
TeamStation AI System Report LATAM IT Salaries 2024TeamStation AI System Report LATAM IT Salaries 2024
TeamStation AI System Report LATAM IT Salaries 2024
 
Generative AI for Technical Writer or Information Developers
Generative AI for Technical Writer or Information DevelopersGenerative AI for Technical Writer or Information Developers
Generative AI for Technical Writer or Information Developers
 
The Fit for Passkeys for Employee and Consumer Sign-ins: FIDO Paris Seminar.pptx
The Fit for Passkeys for Employee and Consumer Sign-ins: FIDO Paris Seminar.pptxThe Fit for Passkeys for Employee and Consumer Sign-ins: FIDO Paris Seminar.pptx
The Fit for Passkeys for Employee and Consumer Sign-ins: FIDO Paris Seminar.pptx
 
Long journey of Ruby standard library at RubyConf AU 2024
Long journey of Ruby standard library at RubyConf AU 2024Long journey of Ruby standard library at RubyConf AU 2024
Long journey of Ruby standard library at RubyConf AU 2024
 
Digital Identity is Under Attack: FIDO Paris Seminar.pptx
Digital Identity is Under Attack: FIDO Paris Seminar.pptxDigital Identity is Under Attack: FIDO Paris Seminar.pptx
Digital Identity is Under Attack: FIDO Paris Seminar.pptx
 
Moving Beyond Passwords: FIDO Paris Seminar.pdf
Moving Beyond Passwords: FIDO Paris Seminar.pdfMoving Beyond Passwords: FIDO Paris Seminar.pdf
Moving Beyond Passwords: FIDO Paris Seminar.pdf
 

Concurrent Stream Processing Components

  • 1. Concurrent Stream Processing Alex Miller - @puredanger Revelytix - http://revelytix.com
  • 2. Contents • Query execution - the problem • Plan representation - plans in our program • Processing components - building blocks • Processing execution - executing plans 2
  • 4. Relational Data & Queries SELECT NAME NAME AGE FROM PERSON Joe 30 WHERE AGE > 20 4
  • 5. RDF "Resource Description Framework" - a fine- grained graph representation of data o/ a ge 30 http : / / d em http://data/Joe http:// d emo/na "Joe" me Subject Predicate Object http://data/Joe http://demo/age 30 http://data/Joe http://demo/name "Joe" 5
  • 6. SPARQL queries SPARQL is a query language for RDF PREFIX demo: <http://demo/> SELECT ?name A "triple pattern" WHERE { ?person demo:age ?age. Natural join on ?person ?person demo:name ?name. FILTER (?age > 20) } 6
  • 7. Relational-to-RDF • W3C R2RML mappings define how to virtually map a relational db into RDF PREFIX demo: <http://demo/> SELECT ?name SELECT NAME WHERE { FROM PERSON ?person demo:age ?age. WHERE AGE > 20 ?person demo:name ?name. FILTER (?age > 20) } NAME AGE emo/ag e 30 Joe 30 http://d http://data/Joe http:// d emo/na "Joe" me 7
  • 8. Enterprise federation • Model domain at enterprise level • Map into data sources • Federate across the enterprise (and beyond) SPARQL Enterprise SPARQL SPARQL SPARQL SQL SQL SQL 8
  • 9. Query pipeline • How does a query engine work? SQL ST an an an A Pl Pl Pl Parse Plan Resolve Optimize Process Metadata Results! 9
  • 10. Trees! SQL Trees! ST n n n A Pla Pla Pla Parse Plan Resolve Optimize Process Metadata Results! 10
  • 12. SQL query plans SELECT Name, DeptName FROM Person, Dept WHERE Person.DeptID = Dept.DeptID AND Age > 20 Person Name Age DeptID join filter project DeptID Age > 20 Name, DeptName Dept DeptID DeptName 12
  • 13. SPARQL query plans SELECT ?Name WHERE { ?Person :Name ?Name . ?Person :Age ?Age . FILTER (?Age > 20) } TP1 { ?Person :Name ?Name } join filter project ?Person ?Age > 20 ?Name TP2 { ?Person :Age ?Age } 13
  • 14. Common model Streams of tuples flowing through a network of processing nodes node node node node node 14
  • 15. What kind of nodes? • Tuple generators (leaves) – In SQL: a table or view – In SPARQL: a triple pattern • Combinations (multiple children) – Join – Union • Transformations – Filter – Project – Dup removal – Slice (limit / offset) – Sort – etc – Grouping 15
  • 16. Representation Tree data structure with nodes and attributes PlanNode childNodes TableNode Table JoinNode joinType Java joinCriteria FilterNode criteria ProjectNode projectExpressions SliceNode limit offset 16
  • 17. s-expressions Tree data structure with nodes and attributes (* (+ 2 3) (- 6 5) ) 17
  • 18. List representation Tree data structure with nodes and attributes (project+ [Name DeptName] (filter+ (> Age 20) (join+ (table+ Empl [Name Age DeptID]) (table+ Dept [DeptID DeptName])))) 18
  • 19. Query optimization Example - pushing criteria down (project+ [Name DeptName] (filter+ (> Age 20) (join+ (project+ [Name Age DeptID] (bind+ [Age (- (now) Birth)] (table+ Empl [Name Birth DeptID]))) (table+ Dept [DeptID DeptName])))) 19
  • 20. Query optimization Example - rewritten (project+ [Name DeptName] (join+ (project+ [Name DeptID] (filter+ (> (- (now) Birth) 20) (table+ Empl [Name Birth DeptID]))) (table+ Dept [DeptID DeptName]))) 20
  • 21. Hash join conversion left tree join+ right tree left tree preduce+ first+ hashes hash-tuples let+ mapcat tuple-matches right tree 21
  • 22. Hash join conversion (join+ _left _right) (let+ [hashes (first+ (preduce+ (hash-tuple join-vars {} #(merge-with concat %1 %2)) _left))] (mapcat (fn [tuple] (tuple-matches hashes join-vars tuple)) _right))) 22
  • 23. Processing trees • Compile abstract nodes into more concrete stream operations: – map+ – pmap+ – number+ – mapcat+ – pmapcat+ – reorder+ – filter+ – pfilter+ – rechunk+ – preduce+ – first+ – pmap-chunk+ – mux+ – preduce-chunk+ – let+ – let-stream+ 23
  • 24. Summary • SPARQL and SQL query plans have essentially the same underlying algebra • Model is a tree of nodes where tuples flow from leaves to the root • A natural representation of this tree in Clojure is as a tree of s-expressions, just like our code • We can manipulate this tree to provide – Optimizations – Differing levels of abstraction 24
  • 26. Pipes Pipes are streams of data Producer Consumer Pipe (enqueue pipe item) (dequeue pipe item) (enqueue-all pipe items) (dequeue-all pipe items) (close pipe) (closed? pipe) (error pipe exception) (error? pipe) 26
  • 27. Pipe callbacks Events on the pipe trigger callbacks which are executed on the caller's thread 27
  • 28. Pipe callbacks Events on the pipe trigger callbacks which are executed on the caller's thread callback-fn 1. (add-callback pipe callback-fn) 27
  • 29. Pipe callbacks Events on the pipe trigger callbacks which are executed on the caller's thread callback-fn 1. (add-callback pipe callback-fn) 27
  • 30. Pipe callbacks Events on the pipe trigger callbacks which are executed on the caller's thread callback-fn 1. (add-callback pipe callback-fn) 2. (enqueue pipe "foo") 27
  • 31. Pipe callbacks Events on the pipe trigger callbacks which are executed on the caller's thread callback-fn 1. (add-callback pipe callback-fn) 2. (enqueue pipe "foo") 27
  • 32. Pipe callbacks Events on the pipe trigger callbacks which are executed on the caller's thread callback-fn 1. (add-callback pipe callback-fn) 2. (enqueue pipe "foo") 3. (callback-fn "foo") ;; during enqueue 27
  • 33. Pipes Pipes are thread-safe functional data structures 28
  • 34. Pipes Pipes are thread-safe functional data structures callback-fn 28
  • 35. Batched tuples • To a pipe, data is just data. We actually pass data in batches through the pipe for efficiency. [ {:Name "Alex" :Eyes "Blue" } {:Name "Jeff" :Eyes "Brown"} {:Name "Eric" :Eyes "Hazel" } {:Name "Joe" :Eyes "Blue"} {:Name "Lisa" :Eyes "Blue" } {:Name "Glen" :Eyes "Brown"} ] 29
  • 37. Pipe tee Send output to multiple destinations 31
  • 38. Nodes • Nodes transform tuples from the input pipe and puts results on output pipe. fn Input Pipe Node Output Pipe •input-pipe •output-pipe •task-fn •state •concurrency 32
  • 39. Processing Trees • Tree of nodes and pipes fn fn fn fn fn fn Data flow 33
  • 40. SPARQL query example SELECT ?Name WHERE { ?Person :Name ?Name . ?Person :Age ?Age . FILTER (?Age > 20) } TP1 { ?Person :Name ?Name } join filter project ?Person ?Age > 20 ?Name TP2 (project+ [?Name] { ?Person (filter+ (> ?Age 20) :Age (join+ [?Person] ?Age } (triple+ [?Person :Name ?Name]) (triple+ [?Person :Age ?Age])))) 34
  • 41. Processing tree { ?Person :Name TP1 ?Name } preduce+ hash-tuples first+ hashes let+ filter project mapcat ?Age > 20 ?Name tuple-matches TP2 { ?Person :Age ?Age } 35
  • 42. riple pattern Mapping to nodes • An obvious mapping to nodes and pipes triple pattern fn preduce+ fn first+ fn fn fn fn let+ filter+ project+ fn triple pattern 36
  • 43. riple pattern Mapping to nodes • Choosing between compilation and evaluation fn preduce+ filter project fn ?Age > 20 ?Name first+ fn fn fn let+ eval fn triple pattern 37
  • 44. Compile vs eval • We can evaluate our expressions – Directly on streams of Clojure data using Clojure – Indirectly via pipes and nodes (more on that next) • Final step before processing makes decision – Plan nodes that combine data are real nodes – Plan nodes that allow parallelism (p*) are real nodes – Most other plan nodes can be merged into single eval – Many leaf nodes actually rolled up, sent to a database – Lots more work to do on where these splits occur 38
  • 46. Execution requirements • Parallelism – Across plans – Across nodes in a plan – Within a parallelizable node in a plan • Memory management – Allow arbitrary intermediate results sets w/o OOME • Ops – Cancellation – Timeouts – Monitoring 40
  • 47. Event-driven processing • Dedicated I/O thread pools stream data into plan fn fn fn fn fn fn I/O threads Compute threads 41
  • 48. Task creation 1.Callback fires when data added to input pipe 2.Callback takes the fn associated with the node and bundles it into a task 3.Task is scheduled with the compute thread pool fn callback Node 42
  • 49. Fork/join vs Executors • Fork/join thread pool vs classic Executors – Optimized for finer-grained tasks – Optimized for larger numbers of tasks – Optimized for more cores – Works well on tasks with dependencies – No contention on a single queue – Work stealing for load balancing Compute threads 43
  • 50. Task execution 1.Pull next chunk from input pipe 2.Execute task function with access to node's state 3.Optionally, output one or more chunks to output pipe - this triggers the upstream callback 4.If data still available, schedule a new task, simulating a new callback on the current node fn callback 44
  • 51. Concurrency • Delicate balance between Clojure refs and STM and Java concurrency primitives • Clojure refs - managed by STM – Input pipe – Output pipe – Node state • Java concurrency – Semaphore - "permits" to limit tasks per node – Per-node scheduling lock • Key integration constraint – Clojure transactions can fail and retry! 45
  • 52. Concurrency mechanisms Acquire Dequeue Input Create invoke Result empty sempahore Yes input message Data task task message empty Close Data Close enqueue Input data on set closed closed? output pipe = true process-input Yes Yes set closed Closes = true output? close-output closed && !closed_done No Yes run-task close set run-task Yes acquire all output- closed_do w/ nil semaphores pipe ne = true msg No release Blue outline = Java lock all = under Java semaphore release 1 all semaphore semaphor es Green outline = Cloj txn Blue shading = Cloj atom
  • 53. Memory management • Pipes are all on the heap • How do we avoid OutOfMemory? 47
  • 54. Buffered pipes • When heap space is low, store pipe data on disk • Data is serialized / deserialized to/from disk • Memory-mapped files are used to improve I/O fn fn fn fn 0100 …. 48
  • 55. Memory monitoring • JMX memory beans – To detect when memory is tight -> writing to disk • Use memory pool threshold notifications – To detect when memory is ok -> write to memory • Use polling (no notification on decrease) • Composite pipes – Build a logical pipe out of many segments – As memory conditions go up and down, each segment is written to the fastest place. We never move data. 49
  • 56. Cancellation • Pool keeps track of what nodes belong to which plan • All nodes check for cancellation during execution • Cancellation can be caused by: – Error during execution – User intervention from admin UI – Timeout from query settings 50
  • 57. Summary • Data flow architecture – Event-driven by arrival of data – Compute threads never block – Fork/join to handle scheduling of work • Clojure as abstraction tool – Expression tree lets us express plans concisely – Also lets us manipulate them with tools in Clojure – Lines of code • Fork/join pool, nodes, pipes - 1200 • Buffer, serialization, memory monitor - 970 • Processor, compiler, eval - 1900 • Open source? Hmmmmmmmmmmm……. 51
  • 58. Thanks... Alex Miller @puredanger Revelytix, Inc.