Sam
0
Q:

reduce ruby baud rate

# frozen_string_literal: true

require "ostruct"

class MyQueue
  class Worker
    def initialize(&blk)
      @blk = blk
    end

    def call
      @blk.call
    end
  end

  class Result
    def initialize(merged)
      @merged = merged
    end

    def in_sync?
      @merged.all?(&:in_sync)
    end
  end

  def initialize
    @workers = []
  end

  def map(&blk)
    @workers << Worker.new(&blk)
  end

  def reduce(accumulator)
    merged = @workers.each_with_object(accumulator) do |worker, acc|
      yield worker.call, acc
    end

    Result.new(merged)
  end
end

servers = %w[server1.test server2.test server3.test]
queue = MyQueue.new

# For each server, enqueue a worker.
servers.each do |server|
  queue.map do
    # Simulate random wait due to network communication
    sleep rand(0..3)
    puts "#{Time.now.utc} - querying: #{server}"

    # Return a result object with the name of the server and the result of the query
    OpenStruct.new(name: server, in_sync: true)
  end
end

result = queue.reduce([]) do |server, memo|
  memo << server
end

puts "in sync: #{result.in_sync?}"
0
class AsyncWorker
  def initialize(&blk)
    @blk = Thread.new(&blk)
  end

  def call
    @blk.join
    @blk.value
  end
end
0
# frozen_string_literal: true

require "ostruct"

class MyQueue
  class Worker
    def initialize(&blk)
      @blk = blk
    end

    def call
      @blk.call
    end
  end

  class Result
    def initialize(merged)
      @merged = merged
    end

    def in_sync?
      @merged.all?(&:in_sync)
    end

    # NEW METHOD
    def errors
      @merged.map(&:error).compact
    end
  end

  def initialize
    @workers = []
  end

  def map(&blk)
    @workers << Worker.new(&blk)
  end

  def reduce(accumulator)
    merged = @workers.each_with_object(accumulator) do |worker, acc|
      yield worker.call, acc
    end

    Result.new(merged)
  end
end

# NEW OBJECT
class Processor
  def call(server)
    result = OpenStruct.new(name: server, in_sync: true)
    sleep rand(0..3)
    puts "#{Time.now.utc} - querying: #{server}"
    raise "boom #{server}" if rand(10) < 1
    result
  rescue => exception
    result.in_sync = false
    result.error   = exception.message
    result
  end
end

servers = %w[server1.test server2.test server3.test]
queue = MyQueue.new
processor = Processor.new

servers.each do |server|
  queue.map do
    # MOVED LOGIC INTO Processor#call
    processor.call(server)
  end
end

result = queue.reduce([]) do |server, memo|
  memo << server
end

if result.in_sync?
  puts "in sync: true"
else
  puts "in sync: false - errors: #{result.errors.join(', ')}"
end
0

New to Communities?

Join the community