-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathfold.clj
1566 lines (1431 loc) · 67.4 KB
/
fold.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
(ns jepsen.history.fold
"Provides a stateful folder for running folds (like `reduce`) over chunked,
immutable collections in linear and concurrent passes. Intended for systems
where the reduction over a chunk may involve expensive work, and not fit in
memory--for instance, deserializing values from disk. Provides sophisticated
optimizations for running folds in parallel, and automatically fusing
together multiple folds.
To build a folder, you need a chunkable collection: see
jepsen.history.core. Jepsen.history chunks vectors by default at 16384
elements per chunk, which is a bit big for a demonstration, so let's chunk
explicitly:
(require '[tesser.core :as t] '[jepsen.history [core :as hc] [fold :as f]])
(def dogs [{:legs 6, :name :noodle},
{:legs 4, :name :stop-it},
{:legs 4, :name :brown-one-by-the-fish-shop}])
(def chunked-dogs (hc/chunked 2 dogs))
(pprint (hc/chunks chunked-dogs))
; ([{:legs 6, :name :noodle} {:legs 4, :name :stop-it}]
; [{:legs 4, :name :brown-one-by-the-fish-shop}])
In real use, chunks should be big enough to take a bit (a second or so?) to
reduce. We keep track of some state for each chunk, so millions is probably
too many. If you have fewer chunks than processors, we won't be able to
optimize as efficiently.
A folder wraps a chunked collection, like so:
(def f (f/folder chunked-dogs))
Now we can perform a reduction on the folder. This works just like Clojure
reduce:
(reduce (fn [max-legs dog]
(max max-legs (:legs dog)))
0
e)
; => 6
Which means transducers and into work like you'd expect:
(into #{} (map :legs) f)
; => #{4 6}
OK, great. What's the point? Imagine we had a collection where getting
elements was expensive--for instance, if they required IO or expensive
processing. Let's put ten million dogs on disk as JSON.
(require '[jepsen.history.fold-test :as ft])
(def dogs (ft/gen-dogs-file! 1e7))
(def f (f/folder dogs))
Reducing over ten million dogs as JSON takes about ten seconds on my
machine.
(time (into #{} (map :legs) dogs))
; 10364 msecs
But with a folder, we can do something *neat*:
(def leg-set {:reducer-identity (constantly #{})
:reducer (fn [legs dog] (conj legs (:legs dog)))
:combiner clojure.set/union})
(time (f/fold f leg-set))
; 1660 msecs
This went roughly six times faster because the folder reduced each chunk in
parallel. Now let's run, say, ten reductions in parallel.
(time (doall (pmap (fn [_] (into #{} (map :legs) dogs)) (range 10))))
; 28477 msecs
This 28 seconds is faster than running ten folds sequentially (which would
have been roughly 87 seconds), because we've got multiple cores to do the
reduction. But we're still paying a significant cost because each of those
reductions has to re-parse the file as it goes.
(time (doall (pmap (fn [_] (f/fold f leg-set)) (range 10))))
; 2261 msecs
Twelve times faster than the parallel version! And roughly 45x faster than
doing the reductions naively in serial. How? Because when you ask a folder to
reduce something, and it's already running another reduction, it *joins* your
new reduction to the old one and performs most (or even all) of the work in
a single pass. The folder is smart enough to do this for both linear and
concurrent folds--and it does it while ensuring strict order and thread
safety for mutable accumulators. Let's replace that reduction with a mutable
HashSet, and convert it back to a Clojure set at the end.
(import java.util.HashSet)
(defn mut-hash-set [] (HashSet.))
(def fast-leg-set {:reducer-identity mut-hash-set
:reducer (fn [^HashSet s, dog]
(.add s (:legs dog)) s)
:combiner-identity mut-hash-set
:combiner (fn [^HashSet s1, ^HashSet s2]
(.addAll s1 s2) s1)
:post-combiner set})
(time (doall (pmap (fn [_] (f/fold f fast-leg-set)) (range 10))))
; 2197 msecs
# In general
A fold represents a reduction over a history, which can optionally be
executed over chunks concurrently. It's a map with the following fields:
; Metadata
:name The unique name of this fold. May be any object, but
probably a keyword.
; How to reduce a chunk
:reducer-identity A function (f history) which generates an identity
object for a reduction over a chunk.
:reducer A function (f history acc op) which takes a history, a
chunk accumulator, and an operation from the history,
and returns a new accumulator.
:post-reducer A function (f history acc) which takes the final
accumulator from a chunk and transforms it before being
passed to the combiner
; How to combine chunks together
:combiner-identity A function (f history) which generates an identity
object for combining chunk results together.
:combiner A function (f history acc chunk-result) which folds
the result of a chunk into the combiner's accumulator.
If nil, performs a left fold linearly, and does not
combine at all.
:post-combiner A function (f history acc) which takes the final acc
from merging all chunks and produces the fold's return
value.
; Execution hints
:associative? If true, the combine function is associative, and can
be applied in any order. If false, we must combine
left-to-right. Right now this does nothing; we haven't
implemented associative combine.
:asap? Folders ramp up processing of concurrent folds gradually
to give other folds a chance to join the pass. Setting
this to `true` disables that optimization, which means a
fold can complete more quickly--at the cost of slowing
down other folds.
Folds should be pure functions of their histories, though reducers and
combiners are allowed to use in-memory mutability; each is guaranteed to be
single-threaded. We guarantee that reducers execute at most once per element,
and at most once per chunk. When not using `reduced`, they are exactly-once.
The final return value from a fold should be immutable so that other readers
or folds can use it safely in a concurrent context.
## Early Termination
For linear folds (e.g. those with no combiner), we support the usual
`(reduced x)` early-return mechanism. This means reduce and transduce work as
you'd expect. Reduced values still go through the post-reducer function.
For concurrent folds, `(reduced x)` in a reducer terminates reduction of that
particular chunk. Other chunks are still reduced. Reduced values
go through post-reduce and are passed to the combiner unwrapped.
If the combiner returns a reduced value, that value passes immediately to the
post-combiner, without considering any other chunks.
Like transducers, post-reducers and post-combiners act on the values inside
Reduced wrappers; they cannot distinguish between reduced and non-reduced
values."
(:refer-clojure :exclude [reduce for])
(:require [clojure [core :as c]
[pprint :as pprint :refer [pprint]]]
[clojure.tools.logging :refer [info warn]]
[dom-top.core :as dt :refer [assert+ loopr]]
[jepsen.history [core :as hc]
[task :as task]]
[slingshot.slingshot :refer [try+ throw+]]
[tesser [core :as tesser]
[utils :refer [maybe-unary]]])
(:import (clojure.lang IReduce
IReduceInit)
(io.lacuna.bifurcan DirectedGraph
IEdge
IGraph
Graph)))
; Utilities
(defn maybe-get-task
"Gets a task in a state, or passes through nil."
[state task-id]
(when-not (nil? task-id)
(task/get-task state task-id)))
(defn maybe-task-id
"Gets the ID of a task, passing through nil."
[task]
(when-not (nil? task)
(task/id task)))
(defn last-missing-index
"Takes an array and returns the index of the last missing element, or
Int/MIN_VALUE if none are missing. Why not -1? Simplifies some of our
comparison logic."
[^objects ary]
(loop [i (dec (alength ary))]
(if-not (aget ary i)
i
(if (= i 0)
Integer/MIN_VALUE
(recur (dec i))))))
(defn reduced-wrapper
"Wraps a reducing function to wrap reduced results in another reduced
wrapper, so that we can detect it and terminate early."
[reducer]
(fn wrapper [acc x]
(let [acc' (reducer acc x)]
(if (reduced? acc')
(reduced acc')
acc'))))
; Support functions for constructing folds
(defn validate-fold
"Throws if fold is malformed. Returns fold otherwise."
[fold]
(when-not (:name fold)
(throw+ {:type ::no-name, :fold fold}))
(when-not (ifn? (:reducer-identity fold))
(throw+ {:type :no-reducer-identity, :fold fold}))
(when-not (ifn? (:reducer fold))
(throw+ {:type :no-reducer, :fold fold}))
(when-not (ifn? (:post-reducer fold))
(throw+ {:type :no-post-reducer, :fold fold}))
; Combiners are optional, but if you give one, you need all three fns
(when (:combiner fold)
(when-not (ifn? (:combiner fold))
(throw+ {:type :no-combiner, :fold fold}))
(when-not (ifn? (:combiner-identity fold))
(throw+ {:type :no-combiner-identity, :fold fold}))
(when-not (ifn? (:post-combiner fold))
(throw+ {:type :no-post-combiner, :fold fold})))
fold)
(defn make-fold
"Takes a fold map, or a function, or two functions, and expands them into a
full fold map. Generally follows the same rules as transducers:
https://clojure.org/reference/transducers#_creating_transducers. With a map:
- `:name`: default :fold
- `:associative?`: default false
- `:reducer-identity`: defaults to `:reducer`
- `:reducer`: must be provided
- `:post-reducer`: defaults to `:reducer`, or `identity` if `:reducer` has
no unary arity.
- `:combiner-identity`: defaults to `:combiner`
- `:combiner`: defaults to nil
- `:post-combiner` defaults to `:combiner`, or `identity` if `:combiner` has
no unary arity.
With a single function, works exactly like transducers. Constructs a
non-associative fold named `:fold` with no combiner, and using f for all
three reducer functions.
With two functions, uses the first for all three reducers, and the second for
all three combiners. This is the *opposite* arity from
`clojure.core.reducers/fold`, but I really do think it makes more sense,
since reduce happens first."
([fn-or-map]
(if (map? fn-or-map)
(let [reducer (if (map? fn-or-map)
(assert+ (:reducer fn-or-map)
(str "No :reducer provided! "
(pr-str fn-or-map)))
fn-or-map)
combiner (:combiner fn-or-map)]
(assoc fn-or-map
:name (or (:name fn-or-map) :fold)
:reducer-identity (or (:reducer-identity fn-or-map) reducer)
:post-reducer (or (:post-reducer fn-or-map)
(maybe-unary reducer))
:combiner-identity (or (:combiner-identity fn-or-map)
combiner)
:post-combiner (or (:post-combiner fn-or-map)
(when combiner
(maybe-unary combiner)))
:associative? (:associative? fn-or-map false)))
; Single reducer fn
{:name :fold
:reducer-identity fn-or-map
:reducer fn-or-map
:post-reducer (maybe-unary fn-or-map)
:associative? false}))
([reducer combiner]
{:name :fold
:associative? false
:reducer-identity reducer
:reducer reducer
:post-reducer (maybe-unary reducer)
:combiner-identity combiner
:combiner combiner
:post-combiner (maybe-unary combiner)}))
; We want to run multiple folds concurrently. This FusedFold datatype allows
; that, and can be extended with new folds later. It returns a vector of
; results for each individual fold it contains.
(defrecord FusedFold
[; These fields let us work transparently like any other fold map
name
reducer-identity
reducer
post-reducer
combiner-identity
combiner
post-combiner
associative?
; But we also track a vector of the original folds we're now fusing.
folds])
; Debugging with full fn values is exhausting
(defmethod pprint/simple-dispatch jepsen.history.fold.FusedFold
[fold]
(.write ^java.io.Writer *out*
(str "(FusedFold " (pr-str (:name fold))
(when (:associative? fold)
" :assoc")
;(pr-str (:folds fold)))
")"
)))
(defn fused?
"Is this fold a fused fold?"
[fold]
(instance? FusedFold fold))
(defn ary->vec
"Turns arrays into vectors recursively."
[x]
(if (= "[Ljava.lang.Object;" (.getName (class x)))
(mapv ary->vec x)
x))
(defn fuse
"Takes a fold (possibly a FusedFold) and fuses a new fold into it. Also
provides a means of joining in-process reducer/combiner state together, so
that you can zip together two independent folds and continue with the fused
fold halfway through. Returns a map of:
:fused A new FusedFold which performs everything the original
fold did *plus* the new fold. Its post-combined results
are a pair of [old-res new-res].
:join-accs A function which joins together accumulators from the
original and new fold. Works on both reduce and combine
accumulators. Returns an accumulator for the new fold.
Works with both folds that have combiners and those that don't.
If this isn't fast enough, we might try doing some insane reflection and
dynamically compiling a new class to hold reducer state with primitive
fields."
[old-fold new-fold]
(let [_ (when (:combiner old-fold)
(assert+ (:combiner new-fold)
{:type ::mismatched-folds
:old old-fold
:new new-fold}))
; If we're fusing into a fused fold, we slot in our fold at the end of
; its existing folds.
folds' (if (fused? old-fold)
(conj (:folds old-fold) new-fold)
; We have no insight here; just make a pair.
[old-fold new-fold])
n (count folds')
_ (assert (< 1 n))
combiner? (boolean (:combiner old-fold))
; We need functions to join together reducer and combiner state.
; The shape here is going to depend on whether the original fold was a
; FusedFold (in which case it has an array of accs) or a normal fold
; (in which case it has a single acc).
join-accs (if (fused? old-fold)
; Old fold has an array; new fold has a single acc.
(fn join-fused [^objects old-accs, new-acc]
(let [old-n (alength old-accs)
accs' (object-array (inc old-n))]
(System/arraycopy old-accs 0 accs' 0 old-n)
(aset accs' old-n new-acc)
accs'))
; Construct tuples
(fn join-unfused [old-acc new-acc]
(object-array [old-acc new-acc])))
split-accs (if (fused? old-fold)
; Old fold has an array; new fold has a single acc.
(fn split-fused [^objects accs]
(let [old-n (dec n)
old-accs (object-array old-n)]
(System/arraycopy accs 0 old-accs 0 old-n)
[old-accs (aget accs old-n)]))
; Both simple folds
(fn split-unfused [^objects accs]
[(aget accs 0) (aget accs 1)]))
; The reducer identity constructs an array of reducer identities
reducer-identity
(fn reducer-identity []
(->> folds'
(map (fn [fold]
((:reducer-identity fold))))
object-array))
; Reducers update each field in the array. This is a *very* hot path.
; We pre-materialize an array of reducer functions to speed up
; traversal. We clobber reducer array state in-place. This SHOULD, I
; think, be OK because of the memory happens-before effects of the task
; executor's queue.
reducers (object-array (map :reducer folds'))
reducer (fn reducer [^objects accs x]
(loop [i 0]
(if (< i n)
(let [reducer (aget reducers i)
acc (aget accs i)
acc' (reducer acc x)]
(aset accs i acc')
(recur (unchecked-inc-int i)))
; Done
accs)))
post-reducer (if combiner?
; Post-reducers just update each field in the array.
; We can clobber it in-place.
(let [post-reducers (object-array
(map :post-reducer folds'))]
(fn post-reducer [^objects accs]
(loop [i 0]
(if (< i n)
(let [post-reducer (aget post-reducers i)
acc (aget accs i)
acc' (post-reducer acc)]
(aset accs i acc')
(recur (unchecked-inc-int i)))
; Done
accs))))
; Unlift results back to [old-res new-res]
(let [old-post-reducer (:post-reducer old-fold)
new-post-reducer (:post-reducer new-fold)]
(fn post-reducer-unlift [accs]
(let [[old-acc new-acc] (split-accs accs)]
[(old-post-reducer old-acc)
(new-post-reducer new-acc)]))))
; Combiners: same deal
combiner-identity (when combiner?
(fn combiner-identity []
(->> folds'
(map (fn [fold]
((:combiner-identity fold))))
object-array)))
combiners (object-array (map :combiner folds'))
combiner (when combiner?
(fn combiner [^objects accs ^objects xs]
(loop [i 0]
(if (< i n)
(let [combiner (aget combiners i)
acc (aget accs i)
x (aget xs i)
acc' (combiner acc x)]
(aset accs i acc')
(recur (unchecked-inc-int i)))
accs))))
; Turn things back into a pair of [old-res new-res] on the way
; out.
old-post-combiner (:post-combiner old-fold)
new-post-combiner (:post-combiner new-fold)
post-combiner (when combiner?
(fn post-combiner [accs]
(let [[old-acc new-acc] (split-accs accs)]
[(old-post-combiner old-acc)
(new-post-combiner new-acc)])))
fused (map->FusedFold
{:name [(:name old-fold) (:name new-fold)]
:associative? (and (:associative? old-fold)
(:associative? new-fold))
:asap? (or (:asap? old-fold)
(:asap? new-fold))
:reducer-identity reducer-identity
:reducer reducer
:post-reducer post-reducer
:combiner-identity combiner-identity
:combiner combiner
:post-combiner post-combiner
:folds folds'})]
{:fused fused
:join-accs join-accs
:split-accs split-accs}))
; Task construction
(defn task-workers
"Takes a Task and returns a vector of tasks which actually do work (e.g. call
a reducer or combiner) for this particular chunk. We need this to figure out
if tasks are safely cancellable, or if some of their work has begun."
[task]
(case (first (task/name task))
(:reduce, :combine, :deliver)
[task]
(:split-reduce, :join-reduce, :split-combine, :join-combine)
(:workers (task/data task))))
(def concurrent-reduce-task-unlock-factor
"This factor controls how quickly concurrent reduce tasks 'unlock' reductions
of later chunks. 8 means that each task unlocks roughly 8 later tasks."
8)
;; Linear fold tasks
(defn make-linear-reduce-task
"Takes a task executor state, a fold, chunks, an index into the chunks, and,
when 0 < i, a previous reduce task. Returns [state' task], where task
continues the previous task's reduction."
([state fold chunks i]
(make-linear-reduce-task state fold chunks i ::first))
([state {:keys [reducer-identity, reducer]} chunks i prev-reduce]
; In order to support early return, we need to know if the reduction on a
; single chunk reduced early or not.
(let [reducer (reduced-wrapper reducer)]
(task/submit state
[:reduce i]
(if (= i 0) [] [prev-reduce])
(fn task [in]
; (info "Linear reduce" i (first in))
(let [acc (if (= i 0)
(reducer-identity)
(first in))]
(if (reduced? acc)
acc
(c/reduce reducer acc (nth chunks i)))))))))
(defn make-linear-combine-task
"Takes a task executor state, a fold, chunks, and the final reduce task.
Returns [state' task] where task is a new task which applies the post-reduce
to the last reduced value, then performs a single combine. Or, if combiner is
not provided, just does post-reduce."
[state {:keys [reducer-identity reducer post-reducer combiner-identity
combiner]} chunks last-reduce]
(task/submit state
[:combine]
[last-reduce]
(fn task [[reduce-acc]]
; (info "Linear combine" reduce-acc)
; Unwrap reduced
(let [reduce-acc (unreduced reduce-acc)
post-reduced (post-reducer reduce-acc)]
(if combiner
; Do a trivial combine
(combiner (combiner-identity) post-reduced)
; Done
post-reduced)))))
;; Concurrent fold tasks
(defn make-concurrent-reduce-task
"Takes a task executor state, a fold, chunks, and a vector of
previously launched reduce tasks for this fold, and an index into the chunks
i. Returns [state' task]: a new task to reduce that chunk.
Unless the fold requests `:asap? true`, we introduce synthetic dependencies
to slow down later reductions. This is less speedy for single folds, but for
multiple folds we actually want to *defer* starting work until later--that
way the later folds have a chance to join and cancel our tasks. So even
though we *could* rush ahead and launch every reduce concurrently, we inject
dependencies between reduce tasks forming a tree: the first chunk unlocks the
second and third, which unlock the fourth through seventh, and so on."
[state {:keys [asap? reducer-identity, reducer, post-reducer]}
chunks prev-reduce-tasks i]
(let [ordering-deps
(when (and (not asap?)
(seq prev-reduce-tasks))
[(nth prev-reduce-tasks
(long (/ (count prev-reduce-tasks)
concurrent-reduce-task-unlock-factor)))])]
(task/submit state
[:reduce i]
ordering-deps
(fn task [_]
; (info :reduce i)
(let [acc (c/reduce reducer
(reducer-identity)
(nth chunks i))]
(post-reducer acc))))))
(defn make-concurrent-combine-task
"Takes a task executor state, a fold, chunks, a chunk index, and either:
1. A reduce task (for the first combine)
2. A previous combine task and a reduce task (for later combines)
Returns [state' task]: a task which combines that chunk with earlier
combines."
([state {:keys [combiner-identity combiner]} chunks i reduce-task]
(task/submit state
[:combine i]
[reduce-task]
(fn first-task [[post-reduced]]
; (info :first-combine i post-reduced)
(let [res (combiner (combiner-identity) post-reduced)]
res))))
([state {:keys [combiner post-combiner]} chunks i
prev-combine-task reduce-task]
(task/submit state
[:combine i]
[prev-combine-task reduce-task]
(fn task [[prev-combine post-reduced]]
; (info :combine i prev-combine post-reduced)
(if (reduced? prev-combine)
; Early return; skip this chunk
; TODO: we might want to use state to bypass later
; reductions, since they aren't used.
prev-combine
(let [res (combiner prev-combine post-reduced)]
res))))))
; Tasks which work for both linear and concurrent folds
(declare clear-old-passes!)
(defn make-deliver-task
"Takes a task executor state, a final combine task, and a function which
delivers results to the output of a fold. Returns [state' task], where task
applies the post-combiner of the fold and calls deliver-fn with the results.
Can also take an optional Folder; we clean up old passes automatically
once delivery occurs."
([state fold task deliver-fn]
(make-deliver-task state fold task deliver-fn nil))
([state {:keys [post-combiner]} task deliver-fn executor]
(let [[state deliver-task]
(task/submit state
[:deliver]
[task]
(fn deliver [[combined]]
; (info :deliver combined)
(try
(let [combined (unreduced combined)
res (if post-combiner
(post-combiner combined)
combined)]
(deliver-fn res))
(finally
(when executor (clear-old-passes! executor))))))
; If something goes wrong in our pipeline, we deliver a CapturedThrow
; instead.
[state catch-task]
(task/catch state :deliver-catch deliver-task
(fn catch [err] (deliver-fn (task/->CapturedThrow err))))]
[state deliver-task])))
(defn split-deliver-fn
"Takes an old and new pass. Constructs a function which takes an [old-res
new-res] pair, or a CapturedThrowable, and delivers them to the old and new
folds' deliver fns, respectively."
[old-pass new-pass]
(let [old-deliver (:deliver old-pass)
new-deliver (:deliver new-pass)]
(fn deliver [input]
(try
(if (task/captured-throw? input)
(do (old-deliver input)
(new-deliver input))
(let [[old-res new-res] input]
(old-deliver old-res)
(new-deliver new-res)))
(catch Throwable t
(warn t "Split deliver of"
(pr-str input) "failed! Some folds may never complete."))))))
(defn make-split-task
"Takes a task executor state, a name, a function that splits an accumulator,
an index to extract from that accumulator, and a task producing that
accumulator. Returns [state' task], where task returns the given index in the
accumulator task."
[state name split-accs i acc-task]
(task/submit state
name
{:workers (task-workers acc-task)}
[acc-task]
(fn split [[acc]]
;(info :split acc)
(nth (split-accs acc) i))))
(defn make-join-task
"Takes a task executor state, a name, a function that joins two accumulator,
and accumulator tasks a and b. Returns [state' task], where task returns the
two accumulators joined. Join tasks keep a vector of :worker tasks they
depend on."
[state name join-accs a-task b-task]
(task/submit state
name
{:workers (into (task-workers a-task)
(task-workers b-task))}
[a-task b-task]
(fn join [[a b]]
;(info :join a b)
(join-accs a b))))
(defn task-work-pending?
"We may have a join task which unifies work from two different reducers or
combiners, or one which splits a reducer or combiner. If we cancel just the
join (split), the reducers (combiners) will still run. This takes a state and
a task and returns true if all the actual work involved in this particular
chunk is still pending. You can also pass :cancelled as a task; this is
always pending."
[state task]
(or (identical? task :cancelled)
(every? (partial task/pending? state) (task-workers task))))
(defn cancel-task-workers
"Takes a state and a reduce, combine, split, or join task. Cancels not only
this task, but all of its dependencies which actually perform the work for
its particular chunk. Returns state'.
A task of :cancelled yields no changes."
[state task]
(condp identical? task
; Already cancelled
:cancelled state
; Unknown, definitely not cancellable!
nil (throw (ex-info {:type :impossible-cancellation
:task task}))
(c/reduce task/cancel state (task-workers task))))
; Pass construction
(defn pass
"Constructs a new linear or concurrent pass map over the given chunks, using
the given fold."
[fold chunks]
(let [fold-type (or (:pass-type fold)
(if (:combiner fold)
:concurrent
:linear))
result (promise)]
(assert+ (#{:linear :concurrent} fold-type))
{:type fold-type
:fold fold
:chunks chunks
:result result
:deliver (partial deliver result)}))
; Printing passes
(defn tasks-str
"Generates a string visualizing a series of tasks, prefixed with name."
[state name tasks]
(let [sb (StringBuilder.)]
(.append sb name)
(.append sb " [")
(loopr [i 0]
[task tasks]
(do (cond (= :cancelled task)
(.append sb "x ")
(not (task/has-task? state task))
(.append sb ". ")
true
(do (.append sb
(case (first (task/name task))
:join-combine ">"
:split-combine "<"
:join-reduce ">"
:split-reduce "<"
:combine "C"
:reduce "R"))
(.append sb (if (task-work-pending? state task)
" "
"!"))))
(recur (inc i))))
(.append sb "]")))
(defn pass-str
"Generates a string visualizing a pass. In its short form, takes a state and
a pass. Or takes a pass and a list of [name tasks] pairs."
[state pass-or-tasks]
(if (map? pass-or-tasks)
(let [pass pass-or-tasks
get-task (partial maybe-get-task state)
task-names (case (:type pass)
; Padding manually here so we'll line up with later
; calls that'll have longer names like :new-combine. Bit
; of a hack, but ah well.
:linear [[" reduce" :reduce-tasks]]
:concurrent [[" reduce" :reduce-tasks]
[" combine" :combine-tasks]])
tasks (mapv (fn [[short-name k]]
[short-name (mapv get-task (get pass k))])
task-names)]
(pass-str state tasks))
; [name task] pairs
(let [sb (StringBuilder.)
name-len (c/reduce max 0 (map (comp count name first) pass-or-tasks))
format-str (str "%1$" name-len "s")]
(loopr [i 0]
[[task-name tasks] pass-or-tasks]
(let [task-name (format format-str (name task-name))]
(.append sb (tasks-str state task-name tasks))
(when (< i (dec (count pass-or-tasks)))
(.append sb "\n"))
(recur (inc i))))
(str sb))))
; Launching a fresh pass
(defn launch-linear-pass
"Takes a task executor State and an unstarted linear pass. Launches all the
tasks required to execute this pass. Returns [state' pass']."
[state {:keys [fold chunks deliver] :as pass}]
(let [n (count chunks)]
(assert (pos? n))
(loop [i 0
state state
reduce-tasks []]
(if (< i n)
(let [[state reduce-task]
(if (= i 0)
(make-linear-reduce-task state fold chunks i)
(make-linear-reduce-task state fold chunks i
(nth reduce-tasks (dec i))))]
(recur (inc i) state (conj reduce-tasks reduce-task)))
; Done!
(let [[state combine-task]
(make-linear-combine-task state fold chunks (peek reduce-tasks))
[state deliver-task]
(make-deliver-task state fold combine-task deliver
(:executor pass))
pass (assoc pass
:reduce-tasks (mapv task/id reduce-tasks)
:combine-task (task/id combine-task)
:deliver-task (task/id deliver-task))]
; (info "Linear pass" (with-out-str (pprint pass)))
[state pass])))))
(defn launch-concurrent-pass
"Takes a task executor State and an unstarted concurrent pass. Launches all
the tasks required to execute this pass. Returns [state' pass']."
[state {:keys [chunks fold deliver] :as pass}]
(let [n (count chunks)]
(assert (pos? n))
; (info "Launching concurrent pass of" n "chunks")
(loop [i 0
state state
reduce-tasks []
combine-tasks []]
(if (< i n)
; Spawn tasks
(let [[state reduce-task]
(make-concurrent-reduce-task state fold chunks reduce-tasks i)
[state combine-task]
(if (= i 0)
(make-concurrent-combine-task state fold chunks i reduce-task)
(make-concurrent-combine-task state fold chunks i
(nth combine-tasks (dec i)) reduce-task))]
(recur (inc i)
state
(conj reduce-tasks reduce-task)
(conj combine-tasks combine-task)))
; Done!
(let [[state deliver-task]
(make-deliver-task state fold (nth combine-tasks (dec n))
deliver (:executor pass))]
[state
(assoc pass
:reduce-tasks (mapv task/id reduce-tasks)
:combine-tasks (mapv task/id combine-tasks)
:deliver-task (task/id deliver-task))])))))
(defn launch-pass
"Takes a task executor state and an unstarted pass, then launches its task,
returning [state' pass']"
[state pass]
(case (:type pass)
:linear (launch-linear-pass state pass)
:concurrent (launch-concurrent-pass state pass)))
; Joining one pass to another
; Different ways we can derive joined reducer/combiner tasks from old+new
; ones and vice-versa. Chunks go right to left. Within a single chunk, we can
; join or split accumulators.
;
; join split
;
; old reducers +-r r<+
; | |
; old combiners | c-+ +>c |
; | | | |
; reducers +>r | | r-+
; | | | |
; combiners | c<+ +-c |
; | | | |
; new combiners | c-+ +>c |
; | |
; new reducers +-r r<+
;
(defn join-linear-pass
"Takes a task executor State and a pair of passes: the first (potentially)
running, the second fresh. Joins these passes into a new pass which tries to
merge as much work as possible into fused reduce/combine operations. Returns
[state' joined-pass]."
[state old-pass new-pass]
(let [chunks (:chunks old-pass)
n (count chunks)
;_ (assert (identical? chunks (:chunks new-pass)))
old-fold (:fold old-pass)
new-fold (:fold new-pass)
;_ (info (str "Joining linear pass " (:name old-fold) " with "
; (:name new-fold) " over " n " chunks:\n"
; (pass-str state old-pass)))
; Fuse folds
{:keys [fused join-accs split-accs]} (fuse old-fold new-fold)
old-reduce-tasks (object-array
(mapv (partial maybe-get-task state)
(:reduce-tasks old-pass)))
old-combine-task (task/get-task state (:combine-task old-pass))
old-deliver-task (task/get-task state (:deliver-task old-pass))
; Zip backwards through reduce tasks looking for a join point: the
; chunk where we start doing fused reduce tasks. We need every old
; reduce task to be cancellable to this point, and we *also* need the
; previous old reduce task to exist so we can chain on to it.
join-i (loop [i (dec n)]
(if (< i 0)
0
(if (and (or (= i 0)
(not (nil? (aget old-reduce-tasks (dec i)))))
(when-let [task (aget old-reduce-tasks i)]
(task-work-pending? state task)))
(recur (dec i)) ; Good, keep going
(inc i))))] ; Whoops, too far.
; We join at 0 if replacing every task. n-1 if only replacing the last
; task. n if no tasks are replacable.
(if (= n join-i)
; Nothing we can cancel!
(launch-linear-pass state new-pass)
; Cancel that task. That'll transitively cancel the rest of the pass.
(let [state (task/cancel state (nth old-reduce-tasks join-i))
; Just for visualization
_ (loop [i join-i]
(when (< i n)
(aset old-reduce-tasks i :cancelled)
(recur (inc i))))
new-reduce-tasks (object-array n)
reduce-tasks (object-array n)
; Spin up new reduce tasks up to but not including the join point
state
(loop [i 0, state state]
(if (<= join-i i)
state
(let [[state task]
(if (= i 0)
(make-linear-reduce-task state new-fold chunks i)
(make-linear-reduce-task
state new-fold chunks i
(aget new-reduce-tasks (dec i))))]
(aset new-reduce-tasks i task)
(recur (inc i) state))))
; Just before the join point, merge old and new reducer state.
state (if (= join-i 0)
; Every task was cancellable; no need to join.
state
; We need to join
(let [i (dec join-i)
[state task]
(make-join-task state [:join-reduce] join-accs
(aget old-reduce-tasks i)
(aget new-reduce-tasks i))]
(aset reduce-tasks i task)
state))
; Then create new fused reduce tasks from that point on.
state
(loop [i join-i
state state]
(if (= i n)
state
(let [[state task]
(if (= i 0)
(make-linear-reduce-task state fused chunks i)
(make-linear-reduce-task
state fused chunks i
(aget reduce-tasks (dec i))))]
(aset reduce-tasks i task)
(recur (inc i) state))))
; Add combine task
[state combine-task]