Go fan-out task processor
-
type Job interface { Process() Print() } // =============== // Change this type HiJob struct { name string } func (j *HiJob) Process() { // do something } func (j *HiJob) Print() { fmt.Printf("Hi, %s\n", j.name) } // =============== func main() { pending := make(chan Job) complete := make(chan Job) var wg sync.WaitGroup // Reader go func() { var line string for { _, err := fmt.Scanln(&line) if err != nil { break } pending <- // create Jobs here } close(pending) }(); // Printer go func() { for completeJob := range complete { completeJob.Print() } }(); // Workers // increase count as needed for i := 0; i < 10; i++ { go func() { for job := range pending { job.Process() complete <- job } wg.Done() }() wg.Add(1) } wg.Wait() close(complete) }
... goddamn it eclipse, did you put in tabs again?
-
Example usage:
type DomainLookupJob struct { domain string result net.IP err error } func (j *DomainLookupJob) Process() { ips, err := net.LookupIP(j.domain) if err == nil { for _, ip := range ips { j.result = ip return } j.result = nil j.err = errors.New("No A or AAAA record found") } else { j.result = nil j.err = err } } func (j *DomainLookupJob) Print() { if j.err != nil { fmt.Printf("%s ERROR %s\n", j.domain, j.err) } else { fmt.Printf("%s %s\n", j.domain, j.result.String()) } } func NewDomainLookupJob(domain string) *DomainLookupJob { return &DomainLookupJob{ domain, nil, nil } }
pending <- NewDomainLookupJob(line)
cat domains | go run domainlookup.go