diff --git a/src/numbers/numbers.go b/src/numbers/numbers.go index 3020ab6..eb5471f 100644 --- a/src/numbers/numbers.go +++ b/src/numbers/numbers.go @@ -62,25 +62,29 @@ func numbersHandler(w http.ResponseWriter, r *http.Request) { return } - // Non-blocking input channel for URL results. - // We will read as much as we can at once. - inputChan := make(chan []int, len(rurl)) + // Non-blocking channel for sorted results + sortChan := make(chan []int, len(rurl)) var wg sync.WaitGroup - // fetch all URLs asynchronously + // fetch all URLs asynchronously and sort them for i := range rurl { wg.Add(1) go func(url string) { defer wg.Done() n, e := getNumbers(url, ctx) - if e == nil { - if n != nil && len(n) > 0 { - inputChan <- n - } else { + if e == nil { // we have a usable JSON list of numbers + if n != nil && len(n) > 0 { // the list of numbers is non-empty + sorted, err := sort.SortedAndDedup(ctx, n) + if err != nil { // sorting threw an error + log.Printf("Sorting took too long, ignoring list") + return + } + sortChan <- sorted + } else { // empty list of numbers log.Printf("Received empty list of numbers from endpoint") } - } else { + } else { // no usable JSON/other error log.Printf("Got an error: %s", e) } }(rurl[i]) @@ -88,69 +92,33 @@ func numbersHandler(w http.ResponseWriter, r *http.Request) { // master routine closing the inputChan go func() { wg.Wait() - close(inputChan) + close(sortChan) }() - // channel for sorting process, so we can short-circuit in - // case sorting takes too long - sortChan := make(chan []int, 1) - // aggregate numbers from URLs - var numberBuffer []int = []int{} - // these are actually sorted - var sortedNumbers []int = []int{} + // result list that is returned on short-circuit + var resultList []int = []int{} - // aggregate and sort loop, - // breaks if all URLs have been processed or the timeout - // has been reached + // get all sorted lists concurrently and merge them as we go done := false for done != true { select { case <-ctx.Done(): log.Printf("Waiting for URL took too long, finishing response anyway") - finishResponse(w, sortedNumbers) + finishResponse(w, resultList) return - case res, more := <-inputChan: - if more { // still URLs to fetch - numberBuffer = append(numberBuffer, res...) - // continue to aggregate numbers from the buffer - continue - } else { // all URLs fetched, sort and be done - log.Printf("Nothing else to fetch") + case res, more := <-sortChan: + if more { + merged := sort.MergeLists(resultList, res) + resultList = merged + } else { + log.Printf("Nothing else to sort") done = true } - // non-blocking branch that sorts what we already have - // we are not done here yet - default: - // only sort if we have new results - if len(numberBuffer) == 0 { - continue - } - } - - // sort fallthrough, either the inputChan is currently "empty" - // or we fetched all URLs already - go func(sorted []int, unsorted_buffer []int) { - res, err := sort.SortedAndDedup(ctx, unsorted_buffer) - if err != nil { - return - } - merged := sort.MergeLists(sortedNumbers, res) - sortChan <- merged - }(sortedNumbers, numberBuffer) - numberBuffer = []int{} - - select { - case merged := <-sortChan: - sortedNumbers = merged - case <-ctx.Done(): - log.Printf("Sorting took too long, finishing response anyway") - finishResponse(w, sortedNumbers) - return } } log.Printf("Result is complete, finishing response") - finishResponse(w, sortedNumbers) + finishResponse(w, resultList) } // Finalizes the JSON response with the given numbers. This always