Scientific Computing, TU Berlin, WS 2019/2020, Lecture 26
Jürgen Fuhrmann, WIAS Berlin
Threads.@spawn
. wait(task)
: wait for the completion of the task fetch(task)
wait for the completion of the taks and retrieve result using BenchmarkTools
using LinearAlgebra
using Printf
using PyPlot
injupyter()=isdefined(Main, :IJulia) && Main.IJulia.inited
JULIA_NUM_THREADS
set to the desired number
Threads.nthreads()
spawn()
¶A task which just returns the number of the thread executing it
function mytask()
return Threads.threadid()
end
function threads_hello(;ntasks=10)
println("number of tasks: $(ntasks)")
tasks=[Threads.@spawn mytask() for i=1:ntasks]
for i=1:length(tasks)
ithd=fetch(tasks[i])
println("task #$(i) was executed thread #$(ithd)")
end
end
threads_hello(ntasks=10)
function partition(N,ntasks)
loop_begin=zeros(Int64,ntasks)
loop_end=zeros(Int64,ntasks)
for itask=1:ntasks
ltask=Int(floor(N/ntasks))
loop_begin[itask]=(itask-1)*ltask+1
if itask==ntasks # adjust last task length
ltask=N-(ltask*(ntasks-1))
end
loop_end[itask]=loop_begin[itask]+ltask-1
end
return (loop_begin,loop_end)
end
Check this
partition(100000,3)
Calculate part of scalar product from n0 to n1
function mydot(a,b,n0,n1)
result=0.0
for i=n0:n1
result+=a[i]*b[i]
end
result
end
Calculate scalar product in parallell
function threaded_mydot(a,b,N,ntasks)
loop_begin,loop_end=partition(N,ntasks)
tasks=[Threads.@spawn mydot(a,b,loop_begin[i],loop_end[i]) for i=1:ntasks]
return mapreduce(task->fetch(task),+,tasks)
end
Compare times, check accuracy
function test_threaded_mydot(;N=400000, ntasks=4)
a=rand(N)
b=rand(N)
res_s=@btime mydot($a,$b,1,$N)
res_p=@btime threaded_mydot($a,$b,$N,$ntasks)
res_s ≈ res_p
end
test_threaded_mydot(N=400000, ntasks=4)
The fork-join model promises that we can skip the scheduling "by hand"
function forkjoin_mydot_primitive(a,b)
result=0.0
N=length(a)
Threads.@threads for i=1:N
result+=a[i]*b[i]
end
result
end
function test_forkjoin_mydot_primitive(;N=400000)
a=rand(N)
b=rand(N)
res_s=@btime mydot($a,$b,1,$N)
res_p=@btime forkjoin_mydot_primitive($a,$b)
res_s ≈ res_p
end
test_forkjoin_mydot_primitive(N=400000)
What was wrong ?
result
:
result+=x
result+=y
result
and gets its actual value r0
result
and gets its actual value r0
r0+x
r0+y
result=r0+y
instead of the correct
value r0+x+y
function forkjoin_mydot_atomic(a,b)
result = Threads.Atomic{Float64}(0.0)
N=length(a)
Threads.@threads for i=1:N
Threads.atomic_add!(result,a[i]*b[i])
end
return result[]
end
function test_forkjoin_mydot_atomic(;N=400000)
a=rand(N)
b=rand(N)
res_s=@btime mydot($a,$b,1,$N)
res_p=@btime forkjoin_mydot_atomic($a,$b)
res_s ≈ res_p
end
test_forkjoin_mydot_atomic(N=400000)
... at least it is correct
function forkjoin_mydot_reduction(a,b)
N=length(a)
result=zeros(Threads.nthreads())
Threads.@threads for i=1:N
ithd=Threads.threadid()
result[ithd]+=a[i]*b[i]
end
return sum(result)
end
function test_forkjoin_mydot_reduction(;N=400000)
a=rand(N)
b=rand(N)
res_s=@btime mydot($a,$b,1,$N)
res_p=@btime forkjoin_mydot_reduction($a,$b)
res_s ≈ res_p
end
test_forkjoin_mydot_reduction(N=400000)
d[i]=a[i]+b[i]*c[i]
function vtriad(N,nrepeat)
a = Array{Float64,1}(undef,N)
b = Array{Float64,1}(undef,N)
c = Array{Float64,1}(undef,N)
d = Array{Float64,1}(undef,N)
Threads.@threads for i=1:N
a[i]=i
b[i]=N-i
c[i]=i
d[i]=-i
end
t_scalar=@elapsed begin
@inbounds @fastmath for j=1:nrepeat
for i=1:N
d[i]=a[i]+b[i]*c[i]
end
end
end
t_parallel=@elapsed begin
for j=1:nrepeat
Threads.@threads for i=1:N
@inbounds @fastmath d[i]=a[i]+b[i]*c[i]
end
end
end
GFlops=N*nrepeat*2.0/1.0e9
@printf("% 10d % 10.3f % 10.3f % 10.3f\n", N, GFlops/t_scalar,GFlops/t_parallel,t_scalar/t_parallel)
return [N, GFlops/t_scalar,GFlops/t_parallel,t_scalar/t_parallel]
GC.gc()
end
function run_triad()
# Approximate number of FLOPs per measurement
flopcount=5.0e8
# Smallest array size
N0=1000
# Data points per decade (of array size)
ppdec=8
# Number of array size increases
nrun=40
# File header
@printf("# nthreads=%d\n",Threads.nthreads())
@printf("# N S_GFlops/s P_GFlops/s speedup\n")
# Loop over measurements
N=N0
result=zeros(4,nrun)
for irun=0:nrun-1
# Have exact powers of 10 in the measurement
if (irun%ppdec==0)
N=N0
N0*=10
end
# Calculate number of repeats so that overall effort stays constant
nrepeat=flopcount/N
result[:,irun+1].=vtriad(Int(ceil(N)),Int(ceil(nrepeat)))
N=N*10^(1.0/ppdec)
end
PyPlot.figure(1)
PyPlot.semilogx(result[1,:], result[2,:])
PyPlot.grid()
PyPlot.ylim(1,20)
PyPlot.xlabel("Array Size")
PyPlot.ylabel("GFlops/s")
end
run_triad()
This notebook was generated using Literate.jl.