Listener Package
To make this routine more general we pass in the handler for the client to run and the metrics object to be updated. So we can cleanly exit we also pass in the context to inform us when to quit, and a wait group so we can indicate that we are done.
The listener package gets touched by a lot of the addons that we are discussing.
- Most of the metrics get set and updated in here.
- The Ctrl-C gets used in here
- A chain of waiting happens here. This will wait for all client handlers to exit, and then inform our parent that we are done.
Tip
Note that the handler is defined as handler func(io.ReadWriteCloser) error
. I
define the connection’s type as an io.ReadWriteCloser
instead of net.Conn
.
The handlers don’t need the extra values available on the net.Conn
. I remove
(actually just hide) them so that I don’t have to consider them during testing
or debugging.
Plain
Take all the addons away and we return to the minimal
func Run(address string, handler func(io.ReadWriteCloser) error) {
listener, _ := net.Listen("tcp", address)
defer listener.Close()
for {
connection, _ := listener.Accept()
spawnClientHandler(connection, handler)
}
}
Full
The full code for the package
1package listener
2
3import (
4 "context"
5 "expvar"
6 "io"
7 "log"
8 "net"
9 "sync"
10)
11
12func Run(
13 ctx context.Context,
14 address string,
15 handler func(io.ReadWriteCloser) error,
16 wg *sync.WaitGroup,
17 metrics *expvar.Map) {
18
19 initializeMetrics(metrics, address)
20
21 // Create group to track our client handlers. When leaving, wait for the
22 // clients then tell our parent that we are done.
23 var clientsWg sync.WaitGroup
24 defer func() {
25 clientsWg.Wait()
26 wg.Done()
27 }()
28
29 // Open the listen port. I could get errors if, for example, the port is
30 // already in use
31 listener, err := net.Listen("tcp", address)
32 if err != nil {
33 log.Printf("Error opening listen socket %q: %s", address, err.Error())
34 return
35 }
36 defer listener.Close()
37
38 // The best way to break out of the Accept is to close the socket. If the
39 // context gets canceled, we close the socket so we can cleanup.
40 go func() {
41 <-ctx.Done()
42 listener.Close()
43 }()
44
45
46 metrics.Add("listening", 1)
47 defer metrics.Add("listening", -1)
48
49 for {
50 connection, err := listener.Accept()
51 if err != nil {
52 log.Printf("Error in accept on %q: %s", address, err)
53 return
54 }
55
56 spawnClientHandler(connection, handler, metrics, &clientsWg)
57 }
58}
59
60// Start the client. Here I can handle all of the basic metrics
61func spawnClientHandler(
62 connection net.Conn,
63 handler func(io.ReadWriteCloser) error,
64 metrics *expvar.Map,
65 clientsWg *sync.WaitGroup) {
66
67 clientsWg.Add(1)
68 go func() {
69 defer clientsWg.Done()
70
71 metrics.Add("busy", 1)
72 metrics.Add("count", 1)
73 defer metrics.Add("busy", -1)
74
75 err := handler(connection)
76 if err != nil {
77 metrics.Add("error", 1)
78 }
79 }()
80}
81
82// make the metric counters exist
83func initializeMetrics(metrics *expvar.Map, address string) {
84 metrics.Add("busy", 0)
85 metrics.Add("count", 0)
86 metrics.Add("error", 0)
87 metrics.Add("listening", 0)
88 addr := new(expvar.String)
89 addr.Set(address)
90 metrics.Set("address", addr)
91}