Simplify main fetching/sorting logic

Reduced the amount of go-routines. Now sorting
is done in the same go-routine as URLs are fetched.

Then these sorted lists are merged one by one
in a loop as they arrive.

This greatly reduces complexity.
This commit is contained in:
Julian Ospald 2017-09-11 15:52:26 +02:00
parent ebbcd71b3d
commit 9476a00648
1 changed files with 25 additions and 57 deletions

View File

@ -62,25 +62,29 @@ func numbersHandler(w http.ResponseWriter, r *http.Request) {
return
}
// Non-blocking input channel for URL results.
// We will read as much as we can at once.
inputChan := make(chan []int, len(rurl))
// Non-blocking channel for sorted results
sortChan := make(chan []int, len(rurl))
var wg sync.WaitGroup
// fetch all URLs asynchronously
// fetch all URLs asynchronously and sort them
for i := range rurl {
wg.Add(1)
go func(url string) {
defer wg.Done()
n, e := getNumbers(url, ctx)
if e == nil {
if n != nil && len(n) > 0 {
inputChan <- n
} else {
if e == nil { // we have a usable JSON list of numbers
if n != nil && len(n) > 0 { // the list of numbers is non-empty
sorted, err := sort.SortedAndDedup(ctx, n)
if err != nil { // sorting threw an error
log.Printf("Sorting took too long, ignoring list")
return
}
sortChan <- sorted
} else { // empty list of numbers
log.Printf("Received empty list of numbers from endpoint")
}
} else {
} else { // no usable JSON/other error
log.Printf("Got an error: %s", e)
}
}(rurl[i])
@ -88,69 +92,33 @@ func numbersHandler(w http.ResponseWriter, r *http.Request) {
// master routine closing the inputChan
go func() {
wg.Wait()
close(inputChan)
close(sortChan)
}()
// channel for sorting process, so we can short-circuit in
// case sorting takes too long
sortChan := make(chan []int, 1)
// aggregate numbers from URLs
var numberBuffer []int = []int{}
// these are actually sorted
var sortedNumbers []int = []int{}
// result list that is returned on short-circuit
var resultList []int = []int{}
// aggregate and sort loop,
// breaks if all URLs have been processed or the timeout
// has been reached
// get all sorted lists concurrently and merge them as we go
done := false
for done != true {
select {
case <-ctx.Done():
log.Printf("Waiting for URL took too long, finishing response anyway")
finishResponse(w, sortedNumbers)
finishResponse(w, resultList)
return
case res, more := <-inputChan:
if more { // still URLs to fetch
numberBuffer = append(numberBuffer, res...)
// continue to aggregate numbers from the buffer
continue
} else { // all URLs fetched, sort and be done
log.Printf("Nothing else to fetch")
case res, more := <-sortChan:
if more {
merged := sort.MergeLists(resultList, res)
resultList = merged
} else {
log.Printf("Nothing else to sort")
done = true
}
// non-blocking branch that sorts what we already have
// we are not done here yet
default:
// only sort if we have new results
if len(numberBuffer) == 0 {
continue
}
}
// sort fallthrough, either the inputChan is currently "empty"
// or we fetched all URLs already
go func(sorted []int, unsorted_buffer []int) {
res, err := sort.SortedAndDedup(ctx, unsorted_buffer)
if err != nil {
return
}
merged := sort.MergeLists(sortedNumbers, res)
sortChan <- merged
}(sortedNumbers, numberBuffer)
numberBuffer = []int{}
select {
case merged := <-sortChan:
sortedNumbers = merged
case <-ctx.Done():
log.Printf("Sorting took too long, finishing response anyway")
finishResponse(w, sortedNumbers)
return
}
}
log.Printf("Result is complete, finishing response")
finishResponse(w, sortedNumbers)
finishResponse(w, resultList)
}
// Finalizes the JSON response with the given numbers. This always