This document discusses concurrent stream processing. It describes representing queries as trees of processing nodes connected by pipes. The nodes transform tuples flowing through the pipes and include operators like joins, filters, and projections. The execution model is event-driven with I/O threads streaming data to compute threads that run node tasks. Memory is managed using buffered pipes that can store data on disk. The system supports parallelism across plans, nodes, and tasks using a fork/join thread pool and concurrency primitives like semaphores and transactions.
2. Contents
• Query execution - the problem
• Plan representation - plans in our program
• Processing components - building blocks
• Processing execution - executing plans
2
4. Relational Data & Queries
SELECT NAME NAME AGE
FROM PERSON
Joe 30
WHERE AGE > 20
4
5. RDF
"Resource Description Framework" - a fine-
grained graph representation of data
o/ a ge 30
http : / / d em
http://data/Joe
http://
d emo/na "Joe"
me
Subject Predicate Object
http://data/Joe http://demo/age 30
http://data/Joe http://demo/name "Joe"
5
6. SPARQL queries
SPARQL is a query language for RDF
PREFIX demo: <http://demo/>
SELECT ?name
A "triple pattern"
WHERE {
?person demo:age ?age. Natural join
on ?person
?person demo:name ?name.
FILTER (?age > 20)
}
6
7. Relational-to-RDF
• W3C R2RML mappings define how to virtually
map a relational db into RDF
PREFIX demo: <http://demo/>
SELECT ?name SELECT NAME
WHERE { FROM PERSON
?person demo:age ?age. WHERE AGE > 20
?person demo:name ?name.
FILTER (?age > 20)
}
NAME AGE
emo/ag
e
30 Joe 30
http://d
http://data/Joe
http://
d emo/na "Joe"
me
7
8. Enterprise federation
• Model domain at enterprise level
• Map into data sources
• Federate across the enterprise (and beyond)
SPARQL
Enterprise
SPARQL SPARQL
SPARQL
SQL SQL SQL
8
9. Query pipeline
• How does a query engine work?
SQL
ST an an an
A Pl Pl Pl
Parse Plan Resolve Optimize Process
Metadata Results!
9
10. Trees!
SQL
Trees!
ST n n n
A Pla Pla Pla
Parse Plan Resolve Optimize Process
Metadata Results!
10
12. SQL query plans
SELECT Name, DeptName
FROM Person, Dept
WHERE Person.DeptID = Dept.DeptID
AND Age > 20
Person
Name
Age
DeptID join filter project
DeptID Age > 20 Name, DeptName
Dept
DeptID
DeptName
12
18. List representation
Tree data structure with nodes and attributes
(project+ [Name DeptName]
(filter+ (> Age 20)
(join+
(table+ Empl [Name Age DeptID])
(table+ Dept [DeptID DeptName]))))
18
19. Query optimization
Example - pushing criteria down
(project+ [Name DeptName]
(filter+ (> Age 20)
(join+
(project+ [Name Age DeptID]
(bind+ [Age (- (now) Birth)]
(table+ Empl [Name Birth DeptID])))
(table+ Dept [DeptID DeptName]))))
19
24. Summary
• SPARQL and SQL query plans have essentially
the same underlying algebra
• Model is a tree of nodes where tuples flow from
leaves to the root
• A natural representation of this tree in Clojure is
as a tree of s-expressions, just like our code
• We can manipulate this tree to provide
– Optimizations
– Differing levels of abstraction
24
27. Pipe callbacks
Events on the pipe trigger callbacks which are
executed on the caller's thread
27
28. Pipe callbacks
Events on the pipe trigger callbacks which are
executed on the caller's thread
callback-fn
1. (add-callback pipe callback-fn)
27
29. Pipe callbacks
Events on the pipe trigger callbacks which are
executed on the caller's thread
callback-fn
1. (add-callback pipe callback-fn)
27
30. Pipe callbacks
Events on the pipe trigger callbacks which are
executed on the caller's thread
callback-fn
1. (add-callback pipe callback-fn)
2. (enqueue pipe "foo")
27
31. Pipe callbacks
Events on the pipe trigger callbacks which are
executed on the caller's thread
callback-fn
1. (add-callback pipe callback-fn)
2. (enqueue pipe "foo")
27
32. Pipe callbacks
Events on the pipe trigger callbacks which are
executed on the caller's thread
callback-fn
1. (add-callback pipe callback-fn)
2. (enqueue pipe "foo")
3. (callback-fn "foo") ;; during enqueue
27
35. Batched tuples
• To a pipe, data is just data. We actually pass
data in batches through the pipe for efficiency.
[
{:Name "Alex" :Eyes "Blue" }
{:Name "Jeff" :Eyes "Brown"}
{:Name "Eric" :Eyes "Hazel" }
{:Name "Joe" :Eyes "Blue"}
{:Name "Lisa" :Eyes "Blue" }
{:Name "Glen" :Eyes "Brown"}
]
29
44. Compile vs eval
• We can evaluate our expressions
– Directly on streams of Clojure data using Clojure
– Indirectly via pipes and nodes (more on that next)
• Final step before processing makes decision
– Plan nodes that combine data are real nodes
– Plan nodes that allow parallelism (p*) are real nodes
– Most other plan nodes can be merged into single eval
– Many leaf nodes actually rolled up, sent to a database
– Lots more work to do on where these splits occur
38
46. Execution requirements
• Parallelism
– Across plans
– Across nodes in a plan
– Within a parallelizable node in a plan
• Memory management
– Allow arbitrary intermediate results sets w/o OOME
• Ops
– Cancellation
– Timeouts
– Monitoring
40
48. Task creation
1.Callback fires when data added to input pipe
2.Callback takes the fn associated with the node
and bundles it into a task
3.Task is scheduled with the compute thread pool
fn
callback
Node
42
49. Fork/join vs Executors
• Fork/join thread pool vs classic Executors
– Optimized for finer-grained tasks
– Optimized for larger numbers of tasks
– Optimized for more cores
– Works well on tasks with dependencies
– No contention on a single queue
– Work stealing for load balancing
Compute threads
43
50. Task execution
1.Pull next chunk from input pipe
2.Execute task function with access to node's state
3.Optionally, output one or more chunks to output
pipe - this triggers the upstream callback
4.If data still available, schedule a new task,
simulating a new callback on the current node
fn
callback
44
51. Concurrency
• Delicate balance between Clojure refs and STM
and Java concurrency primitives
• Clojure refs - managed by STM
– Input pipe
– Output pipe
– Node state
• Java concurrency
– Semaphore - "permits" to limit tasks per node
– Per-node scheduling lock
• Key integration constraint
– Clojure transactions can fail and retry!
45
52. Concurrency mechanisms
Acquire Dequeue Input Create invoke Result
empty
sempahore Yes input message Data task task message
empty Close Data
Close
enqueue
Input data on
set closed
closed? output pipe
= true
process-input
Yes Yes
set closed Closes
= true output?
close-output closed &&
!closed_done
No
Yes run-task
close set run-task
Yes
acquire all
output- closed_do w/ nil
semaphores
pipe ne = true msg No
release
Blue outline = Java lock
all = under Java semaphore
release 1
all
semaphore
semaphor
es Green outline = Cloj txn
Blue shading = Cloj atom
54. Buffered pipes
• When heap space is low, store pipe data on disk
• Data is serialized / deserialized to/from disk
• Memory-mapped files are used to improve I/O
fn
fn fn
fn
0100 ….
48
55. Memory monitoring
• JMX memory beans
– To detect when memory is tight -> writing to disk
• Use memory pool threshold notifications
– To detect when memory is ok -> write to memory
• Use polling (no notification on decrease)
• Composite pipes
– Build a logical pipe out of many segments
– As memory conditions go up and down, each segment
is written to the fastest place. We never move data.
49
56. Cancellation
• Pool keeps track of what nodes belong to which
plan
• All nodes check for cancellation during execution
• Cancellation can be caused by:
– Error during execution
– User intervention from admin UI
– Timeout from query settings
50
57. Summary
• Data flow architecture
– Event-driven by arrival of data
– Compute threads never block
– Fork/join to handle scheduling of work
• Clojure as abstraction tool
– Expression tree lets us express plans concisely
– Also lets us manipulate them with tools in Clojure
– Lines of code
• Fork/join pool, nodes, pipes - 1200
• Buffer, serialization, memory monitor - 970
• Processor, compiler, eval - 1900
• Open source? Hmmmmmmmmmmm……. 51
58. Thanks...
Alex Miller
@puredanger
Revelytix, Inc.