В одной из последних итераций было принято решение переводить MS на асинхронный формат. Если раньше постановка задач на MS шла только через http, то теперь все задачи стали отправляться в rabbitMQ (RM -> MS), а ответы пересылались по другой очереди (MS -> RM).
На фронте очень многое было заточено именно на http запросы (почему — уже отдельный разговор), поэтому оперативно и без костылей перестроиться под новые реалии MS не получалось. Именно поэтому и было решено расширить RM так, чтобы он делал следующее:
Получаем запрос от фронта (http);
Отправляем данные на MS (mq);
Получаем данные от MS (mq);
Отправляем ответ на http запрос от фронта.
В данной статье я хочу рассказать о простом и элегантном решении данной проблемы.
В данном примере я специально опущу обработку ошибок, получение данных из rabbit и прочие операции, которые не важны в контексте данной статьи.
Для передачи данных между горутинами лучше всего подходят каналы, поэтому первым делом создадим map наших каналов
var AsyncChans = make(map[string]chan []byte)
У нас есть некий http роут, который принимает данные в виде map[string]interface{}:
var request map[string]interface{}
json.Unmarshal(body, &request)
Для начала нам нужен уникальный идентификатор, чтобы была возможность узнать на какое из сообщений пришел ответ. Я буду использовать uuid. Создаем его и добавляем в реквест:
commandID, _ := uuid.NewRandom()
request["commandID"] = commandID.String()
А теперь, вместо того, чтобы получить данные сразу (как было по http)
data := SendTasks(jsonByte)
Мы вначале создаем новый канал в нашей map, отправляем данные в горутине и начинаем ждать ответ уже не от функции SendTasks, а из канала:
var AsyncChans = make(map[string]chan []byte)
// go в начале добавлено для того, чтобы явно показать что функция вызывается асинхронно.
go SendTasks(jsonByte)
data := <-AsyncChans[commandID.String()]
Теперь осталось немного модифицировать получение ответов:
for m := range msgs {
var request map[string]string
json.Unmarshal(m.body, &request)
AsyncChans[request["commandID"]] <- m.body
}
Отлично. Базовая версия готова. Теперь при http запросе наш сервис будет отправлять сообщение в rabbit, ждать ответа, и отдавать его в качестве ответа на http запрос.
Давайте разберемся почему так происходит. Вспомним свойство небуферизированных каналов — они блокируют выполнение горутины до тех пор, пока другая горутина не передаст в них что либо, или пока они не будут явно закрыты.
Получается что вот этой строчкой
data := <-AsyncChans[commandID.String()]
мы блокируем наш http запрос — он просто висит и ждет, а вот этой строкой
AsyncChans[request["commandID"]] <- m.body
мы его разблокируем, отправляя в канал данные, которые http запрос отдаст в качестве ответа:
Все это выглядит хорошо, и даже работает, но до тех пор, пока в один прекрасный момент MS не упадет. Он, конечно, поднимется, но шанс, что он потеряет в процессе выполнения одну из команд, очень велик. А это значит, что на какой-то commandID ответ не придет никогда, и мы получим горутину которая будет вечно висеть, пока мы не перезапустим наш сервис.
С этим нужно что-то делать и на помощь нам приходят контекст с таймаутом:
timeout := time.Second * 10
ctx, cancel := context.WithTimeout(context.Background(), timeout)
Если кто забыл что это, и как они работают, то кратко напомню — контекст с таймаутом пишет в канал своей структуры (ctx.Done()) в том случае, когда либо прошел таймаут после инициализации контекста (в нашем случае 10 секунд), либо была вызвана функция отмены (в нашем случае cancel()).
Контекст есть, теперь нужно слушать его канал. Делаем это в отдельной горутине сразу следом за объявлением контекста:
go func(ctx context.Context) {
<-ctx.Done()
close(AsyncChans[commandID.String()])
delete(AsyncChans, commandID.String())
}(ctx)
В данной горутине происходит следующее — канал на 2 строке примера блокирует выполнение горутины и она начинает ждать. А как я уже писал выше — ждет она либо когда пройдет 10 секунд, либо когда будет вызвана функция cancel().
Также, нам необходимо добавить вызов функции отмены контекста после успешного получения данных и обработку закрытия канала. Напомню, что при получении данных из канала есть второй аргумент. Он равен false в случае, если пытаемся прочитать сообщения из пустого и закрытого канала:
data, ok := <-AsyncChans[commandID.String()]
cancel()
И получаем следующий код:
AsyncChans[commandID.String()] = make(chan []byte)
go SendTasks(jsonByte)
timeout := time.Second * 10
ctx, cancel := context.WithTimeout(context.Background(), timeout)
go func(ctx context.Context) {
<-ctx.Done()
close(AsyncChans[commandID.String()])
delete(AsyncChans, commandID.String())
}(ctx)
data, ok := <-AsyncChans[commandID.String()]
cancel()
Как же именно он поможет решить проблему отсутствия ответа — давайте разбираться:
- В случае, если у нас есть ответ, то мы сохраняем его в data и вызываем функцию отмены. В горутине закрывается канал, и удаляется из map всех каналов.
- В случае, если у нас нет ответа, то через 10 секунд горутина разблокируется, закрывает канал, и удаляет его из map. Как только канал закрыт, ожидающий данных канал разблокирует горутину, но data у нас равно nil, а ok — false. Теперь вызываем функцию отмены контекста, но так как он уже закрыт, ничего не происходит.
Получается, что максимум, сколько мы будем ждать ответ, это 10 секунд. В случае, если через 10 секунд ответ не получен, то горутина продолжит свою работу, а не зависнет навсегда. Осталось только это обработать и показать фронту что у нас 504:
if !ok {
return errors.New("504 Gateway Time-out")
}
На этом все. Данный способ подходит для взаимодействия любых синхронных и асинхронных сервисов. Он достаточно лаконичный и не требует больших правок в кодовой базе. Спасибо за внимание.
Специально для сайта ITWORLD.UZ. Новость взята с сайта Хабр