Acknowledgement system on stream



Opened streams don’t mean the core is actually ready to post message to them. That’s why sometimes clients expect to get message from streams but they are actually empty.

For example, in the following implementation of ListenResult, a lot of stuff is done before the listener (ln) is actually created and ready to receive message.

One suggestion that could solve it is to add an acknowledgement system:

  • core send a ready message when the client open a stream. client wait for this message to continue. if it doesn’t receive a message because a certain timeout, it should return an error.
  • core send a closed message just before the core do a “graceful” close of the stream. if the client doesn’t receive this message and the stream has closed, it retries the connection.

It seems that gRPC metadata is a perfect solutions for this system:

I created a proof of concept that implement the ready message by using gRPC header in around 10 lines of code for both server and client implementation:


func (s *Server) ListenResult(request *coreapi.ListenResultRequest, stream coreapi.Core_ListenResultServer) error {
	// Do stuff

 	// send header to confirm to client that the server is ready
	if err := stream.SendHeader(metadata.Pairs("status", "ready")); err != nil {
		return err

	// send message to stream


// ServiceListenResults returns a channel with event results streaming..
func (p *ServiceProvider) ServiceListenResults(id, taskFilter, outputFilter string, tagFilters []string) (chan *coreapi.ResultData, chan error, error) {
	// create stream

	// wait for server to return its ready
	meta, err := stream.Header()
	if err != nil {
		return nil, nil, err
	statuses := meta.Get("status")
	println("header status", statuses[len(statuses)-1])

	// read stream

@ilgooz @Anthony @krhubert what do you think about it?


After speaking with @krhubert, the acknowledgement system is something good to implement but should not be used for fixing the issue of “when we need to execute a task and listen for its result”.
To solve this problem, we could introduce a new very simple gRPC API that return a result of a executionID, see Get result api


Implemented here:


I’m in favor of ack system. Because in the client side, once the listening request is made, it’s an expected behavior to get stream (event/result/task) data being produced right after func is returned.

Get result api