>> Introspection

After releasing an Elixir project to production, it is time for introspection. Context, the project uses a microservice architecture over a nsq message queue where each service is mostly an umbrella phoenix app that offers a GraphQL interface via absinthe. While I am happy with the language and GraphQL in general, I am not a fan of the complexity that microservices implied. Grievances aside, among the many lessons and insights earned, one of them was worth some research.

The choice for using nsq over rabbitmq, kafka, zeromq or nats was primarily its lightweight nature because of the project's small messaging requirement. With an offbeat decision, does nsq have language support? Thankfully, it does with elixir_nsq but using it directly is clunky for an standard application. Good news, conduit is an library for abstracting message queues inspired by plug that looks nice. Bad news, it does not support nsq. After much time and thought, the best plan was to stick with conduit and attempt to write an adapter library. If the venture failed, the popular rabbitmq is always a fallback. Thankfully, conduit_nsq was born and thus nsq stays.

Reviewing the workflow, so when one service needs something from another, the following steps are taken:

  1. Pick a topic say product
  2. Create a handler for product_request in the responding service.
  3. Implement the domain method for that handler
  4. Implement a blocking method that publishes product_request and waits for product_response in the asking service
  5. Create a handler for waiting on product_response and pass the data to unblock the method

Below is a simplified snippet of the changes needed. (Feel free to skim it and get a general understanding of the structure and form)

# responder/broker.ex
configure do
  queue("product_response")
end

incoming Receiver do
  subscribe(:product_request, ProductRequest, topic: "product_request", channel: "responder")
end

outgoing do
  publish(:product_response, topic: "product_response")
end

# responder/product_request.ex
defmodule Responder.ProductRequest do
  use Conduit.Subscriber

  def process(%Conduit.Message{correlation_id: correlation_id, body: data} = message, _opts) do
    types = %{product_id: :string}

    data =
      {%{}, types}
      |> Ecto.Changeset.cast(data, Map.keys(types))
      |> Ecto.Changeset.validate_required(Map.keys(types))
      |> case do
        %Ecto.Changeset{valid?: true} = changeset ->
          %{product_id: product_id} = Ecto.Changeset.apply_changes(changeset)

          Responder.Products.get_by_id(product_id)

        _ ->
          nil
      end

    %Conduit.Message{}
    |> Conduit.Message.put_body(data)
    |> Conduit.Message.put_correlation_id(correlation_id)
    |> Responder.Broker.publish(:product_response)

    message
  end
end

# receiver/broker.ex
configure do
  queue("product_request")
end

incoming Receiver do
  subscribe(:product_response, ProductResponse, topic: "product_response", channel: "receiver")
end

outgoing do
  publish(:product_request, topic: "product_request")
end

# receiver/client.ex
defmodule Receiver.Client do
  def get_product_by_id(product_id) do
    correlation_id = Ecto.UUID.generate()

    {:ok, _} = Registry.register(Receiver.Client.Registry, correlation_id, :get_product_by_id)

    %Conduit.Message{}
    |> Conduit.Message.put_body(%{product_id: product_id})
    |> Conduit.Message.put_correlation_id(correlation_id)
    |> Broker.publish(:product_request)

    product = receive do
      data -> data
    after
      :timer.seconds(5) -> nil
    end

    :ok = Registry.unregister(Receiver.Client.Registry, correlation_id)

    product
  end
end

# receiver/product_response.ex
defmodule Receiver.ProductResponse do
  use Conduit.Subscriber

  def process(%Conduit.Message{correlation_id: correlation_id, body: data} = message, _opts) do
    types = %{name: :string, price: :float}

    {%{}, types}
    |> Ecto.Changeset.cast(data, Map.keys(types))
    |> Ecto.Changeset.validate_required(Map.keys(types))
    |> case do
         %Ecto.Changeset{valid?: true} = changeset ->
           product = Ecto.Changeset.apply_changes(changeset)

           case Registry.lookup(Receiver.Client.Registry, correlation_id) do
             [{pid, :get_product_by_id}] ->
               send(pid, product)

             _ ->
               nil
           end

         _ ->
           nil
       end

    message
  end
end

The code above is functional and works; however, I do have some concerns:

  • Unstructured Data

Before using the message body, it has to be casted and validated from a string map into an atom map. The team standard is to treat string maps as unstructured or unsafe data before being used or pattern matched even if it comes from the queue but is still boilerplate.

  • Broker Setup

Setting up the topics and identical routes is error prone if not careful. Being confused with the topics used in receiver or handler is not a rare occurrence which led to some self-loathing.

  • Request/Response Boilerplate

Creating two small modules for new service interactions seems menial. Not to say that it does not make sense just that creating multiple tiny files adds up and is a maintenance cost.

Rather, what this reminds me is the comparison between GraphQL and REST. I am not here to argue their merits but I favor GraphQL for establishing a typed contract between the consumer and provider. The problems above can be solved if GraphQL can work with a message queue transport. Fascinating, can it be done?

NOTE: Other communication library or technologies exist such as grpc or thrift to solve the problem; however, this increases the technology stack which is exciting but a risk. While this exercise or idea may not be the most efficient for the task, it works within the constraints and serves as a different perspective of GraphQL.

>> Proof Of Concept

Programmers are optimistic so whether it can be done is yes if not a matter of time; however more than the binary answer, our concern is how does this style or design affect productivity, stability or efficiency of a project. To test this idea, we will be creating a sandbox project with four services for a hypothetical product API:

  • Accounts Service - Holds user data
  • Products Service - Stores product data
  • Transactions Service - Contains the transactions made by a user
  • API Gateway - The main GraphQL interface for integrating the services

The quick sketch of the operations and interactions are:

  1. A user will register through the registerUser mutation via the account service
  2. That user can look at the available products at products query via product service
  3. The user can pick several products by id and use the createTransaction mutation via the transactions service. To make this interesting, this calls the product service to fetch the current price of goods.

In building the project, I took the following steps:

  1. Create the account service and users query
  2. Build the gateway and connect to the other service
  3. Implement and expose the registerUser mutation
  4. Create the product service, the products query, and integrate with it the gateway
  5. Implement the createTransaction mutation
  6. Add User.transactions field
  7. Add GraphiQL to each service

You can follow along or checkout the working code and try it out for yourself which uses docker-compose for ease. Nonetheless, I will be working through the design and implementation process starting with the basic account service.

NOTE: I will only show relevant code in building the research. Things like the migrations, changesets or domain functions will pad the article and is not that interesting. I also assume you are familiar with implementing your own absinthe and phoenix GraphQL API; however, I will not assume you know conduit or nsq.

>> Account Service

Our initial objective is to setup the account service which can be easily setup via mix new --sup account_service. We then set up the following:

We need a database or storage to store the users.

sudo systemctl start postgresql   # Assuming a standard postgres
sudo -u postgres -i

createuser -d -P mq_user          # Enter passwod: mq_pass
createdb mq_db -O mq_user
config :account_service, AccountService.Repo,
  username: "mq_user",
  password: "mq_pass",
  database: "mq_db",
  hostname: "localhost",
  port: 5432,
  show_sensitive_data_on_connection_error: true,
  pool_size: 10

While I could have used an Agent, I wanted to simulate changeset errors when doing mutation testing.

We need a running nsqd at port 14150:

nsqd --tcp-address=0.0.0.0:14150 --http-address=0.0.0.0:14151

While you can use the default port 4150, I prefer to run them in a different port in every project.

config :account_service, AccountService.Broker,
  adapter: ConduitNSQ,
  producer_nsqds: ["localhost:14150"],
  nsqds: ["localhost:14150"]

Setting up the broker.ex with a JSON parsing is easy to setup:

# account_service/broker.ex
defmodule AccountService.Broker do
  use Conduit.Broker, otp_app: :account_service

  @channel "account_service"

  pipeline :serialize do
    plug(Conduit.Plug.Wrap)
    plug(Conduit.Plug.Encode, content_encoding: "json")
  end

  pipeline :deserialize do
    plug(Conduit.Plug.Decode, content_encoding: "json")
    plug(Conduit.Plug.Unwrap)
  end
end

# config.exs
config :conduit, Conduit.Encoding, [
  {"json", Absinthe.Conduit.Json}   # Custom JSON encoding to use Jason
]

Personally, it feels strange install absinthe without phoenix or plug but we will install them near the end.

With a fresh schema.ex, we can start with our User type and users query:

# account_service/schema.ex
object :user do
  description("User of this wonderful platform")

  field(:id, non_null(:id), description: "User ID")
  field(:email, non_null(:string), description: "User email")
  field(:first_name, non_null(:string), description: "User first name")
  field(:last_name, non_null(:string), description: "User last name")
end

query do
  field(:users, non_null(list_of(non_null(:user))),
    resolve: fn _, _, _ ->
      {:ok, AccountService.get_users()}   # Basically Repo.all(User)
    end
  )
end

The corresponding user.ex schema and migration easily follows from that. To check if its all fired up:

mix do ecto.drop, ecto.setup    # ecto.setup = [ecto.create, ecto.migrate, ecto.seed]

iex -S mix run --no-halt
iex> AccountService.Repo.all(AccountService.User)

>> Absinthe via Conduit

Taking our cue from the motivation, we will expose a request and response topic:

# account_service/broker.ex
configure do
  queue("account_graphql_response")
end

incoming AccountService do
  pipe_through([:deserialize])

  subscribe(:graphql_request, GraphQLRequest, topic: "account_graphql_request", channel: @channel)
end

outgoing do
  pipe_through([:serialize])

  publish(:graphql_response, topic: "account_graphql_response")
end

Working through the snippet above, we receive on the account_graphql_request topic and send on the account_graphql_response topic as our convention. More importantly, the request handler module is AccountService.GraphQLRequest where the magic happens:

defmodule AccountService.GraphQLRequest do
  use Conduit.Subscriber

  def process(%Conduit.Message{correlation_id: correlation_id, body: body} = message, _opts) do
    types = %{query: :string, variables: :map, context: :map}

    {%{}, types}
    |> Ecto.Changeset.cast(body, Map.keys(types))
    |> Ecto.Changeset.validate_required([:query])
    |> case do
         %Ecto.Changeset{valid?: true} = changeset ->
           data = Ecto.Changeset.apply_changes(changeset)

           query = Map.fetch!(data, :query)
           variables = Map.get(data, :variables, %{})
           context = Map.get(data, :context, %{})

           # NEXT: What to do here?

         changeset ->
           nil
       end

    # Need to return the message to mark it as acknowledged
    message
  end
end

So for every message, we assume it has a payload/body of a string map with a required query string and optional variables and context map. With an schemaless changeset, those fields are easily extracted. We also require it to have a correlation_id to have a way to return it to the sender.

Browsing around the absinthe documentation, I found Absinthe.run and quickly capitalized:

{:ok, response} = Absinthe.run(query, AccountService.Schema, variables: variables, context: context)

Excellent, we just need to return the data back which is easily done:

%Conduit.Message{}
|> Conduit.Message.put_body(response)
|> Conduit.Message.put_correlation_id(correlation_id)
|> AccountService.Broker.publish(:graphql_response)    # The topic ID in broker.ex, not the topic itself

Testing it out, we run mix run --no-halt again and use the logging commands nsq_tail to watch any messages being published in the GraphQL topics:

# In one terminal
nsq_tail --nsqd-tcp-address=0.0.0.0:14150 --topic=account_graphql_request
# In another terminal
nsq_tail --nsqd-tcp-address=0.0.0.0:14150 --topic=account_graphql_response

Once the listeners are in place, we can curl the HTTP publish endpoint with this JSON payload to simulate a call:

{
  "body": {
    "query": "query { users { email firstName lastName } }"
  },
  "fields": {
    "correlation_id": "ABC"
  },
  "headers": {}
}
curl -i -H Content-Type\:\ application/json -XPOST http\://localhost\:14151/pub\?topic\=account_graphql_request -d \{\"body\"\:\ \{\"query\"\:\"query\ \{\ users\ \{\ email\ firstName\ lastName\ \}\ \}\"\,\ \"variables\"\:\ \{\"a\"\:\ 1\}\}\,\ \"fields\"\:\ \{\"correlation_id\"\:\ 1\}\,\ \"headers\"\:\ \{\}\}

The extra fields and wrapping of the query field is because of the JSON encoding pipeline in Broker which is also the way to set a header/meta field such as correlation_id in the message itself. More importantly, a snippet from the output of account_graphql_response proves that it works.

{
  "body": {
    "data": {
      "users": [
        {
          "email": "hollie.ullrich@towne.net",
          "firstName": "OSCAR",
          "lastName": "MAX"
        },
        {
          "email": "kaitlin2094@gutkowski.biz",
          "firstName": "POPPY",
          "lastName": "MAX"
        },
        {
          "email": "marjolaine2098@lowe.org",
          "firstName": "LILY",
          "lastName": "DAISY"
        },
        {
          "email": "emelia_mosciski@balistreri.info",
          "firstName": "JASPER",
          "lastName": "MAX"
        },
        {
          "email": "arnoldo.stehr@herzog.info",
          "firstName": "LILY",
          "lastName": "SHADOW"
        }
      ]
    }
  },
  "fields": {
    "correlation_id": "ABC",
    "destination": "graphql_response"
  },
  "headers": null
}

>> API Gateway

With account service ready to accept GraphQL queries via message queue, we now set up the gateway to connect our new services. We create our phoenix app via phx.new with the barest of options:

mix phx.new api_gateway --no-ecto --no-html --no-gettext --no-webpack

Similar to account service, we install conduit and absinthe but with absinthe_plug this time:

# api_gateway_web/router.ex
scope "/" do
  pipe_through :api

  forward "/graphql", Absinthe.Plug, schema: ApiGateway.Schema
  forward "/graphiql", Absinthe.Plug.GraphiQL, schema: ApiGateway.Schema
end

The setup for the broker is similar as well:

# api_gateway.broker.ex
@channel "api_gateway"

configure do
  queue("account_graphql_request")
end

incoming ApiGateway do
  pipe_through([:deserialize])

  subscribe(:account_graphql_response, AccountGraphQLResponse, topic: "account_graphql_response", channel: @channel)
end

outgoing do
  pipe_through([:serialize])

  publish(:account_graphql_request, topic: "account_graphql_request")
end

We then copy the User type and users query but this we need to resolve it by calling the GraphQL interface of accounts service:

# api_gateway/schema.ex
field :users, non_null(list_of(non_null(:user))) do
  resolve(fn _, _, _ ->
    # NEXT: What to do here?
  end)
end

We wrap this functionality in a client similar to the motivating example:

# api_gateway/account_client.ex
defmodule ApiGateway.AccountClient do
  def run(query, variables \\ %{}, context \\ %{}) do
    request_id = Ecto.UUID.uuid4()

    {:ok, _} = Registry.register(@registry, request_id, :graphql)

    result =
      %Conduit.Message{}
      |> Conduit.Message.put_correlation_id(request_id)
      |> Conduit.Message.put_body(%{
          query: query,
          variables: variables,
          context: context
      })
      |> ApiGateway.Broker.publish(:account_graphql_request)
      |> case do
           {:ok, _} ->
             receive do
               response -> {:ok, response}
             after
               :timer.seconds(15) -> {:error, :response_timeout}
             end
           error -> error
         end

      :ok = Registry.unregister(@registry, request_id)

      result
  end
end

To call the account service users query in the resolver:

# api_gateway/schema.ex
AccountClient.run("""
query {
  users {
    id
    email
    firstName
    lastName
  }
}
""")
|> case do
     {:ok, %{"data" => %{"users" => users}}} ->
       {:ok, users}

     error ->
       error
  end
end)

Notice the naming convention in the query or GraphQL in general is camelCase while for Elixir it is under_scored. The issue is that absinthe expects the data to be in under_score and converts it into camelCase. So when we use the query above, first_name since it receives firstName. As a quick fix, we can convert the data received from the response via atomic_map or create a middleware to do the same. Either fix is converting converted data which seems inefficient, instead vwe can tell absinthe to retain the under_score convention by setting the adapter to Absinthe.Adapter.Passthrough:

# account_service/config/config.exs
config :absinthe,
  adapter: Absinthe.Adapter.Passthrough

# api_gateway/schema.ex
"""
query {
  users {
    id
    email
    first_name
    last_name
  }
}
"""

This is also useful when other services need to communicate with each other later on. Within the internal network or system, the convention is under_score; when it leaves the gateway, it becomes camelCase. One more thing remains before it completely works, the response from a GraphQL endpoint is a string map and by default absinthe only picks up atom keys. So even with the correct keys, we need to tell absinthe to pick up both atom and string keys. Taking cues from the default middleware guide, we can change the default middleware to check for both:

# api_gateway/schema.ex
def middleware(middleware, %{identifier: identifier} = field, object) do
  field_name = Atom.to_string(identifier)

  new_middleware_spec = {{__MODULE__, :get_field_key}, {field_name, identifier}}

  Absinthe.Schema.replace_default(middleware, new_middleware_spec, field, object)
end

def get_field_key(%Resolution{source: source} = res, {key, fallback_key}) do
  new_value =
    case Map.fetch(source, key) do
      {:ok, value} ->
        value

      :error ->
        Map.get(source, fallback_key)
    end

  %{res | state: :resolved, value: new_value}
end

All pieces are now in place and we can use the GraphiQL endpoint(http://localhost:14000/graphiql) to check our work. Running the original query above leads to the same result as the account service.

>> registerUser Mutation

With a working query, we can test mutations with a new registerUser mutation which should look like this:

mutation($email: String!, $firstName: String!, $lastName: String!) {
  register_user(
    input: {
      email: $email
      firstName: $firstName
      lastName: $lastName
    }
  ) {
    errors {
      field
      message
    }
    user {
      id
      email
      firstName
      lastName
    }
  }
}

Aside from using absinthe_relay, creating the whole vertical slice for the mutation from the account service to the gateway is straightforward. Calling a mutation and passing the arguments from one endpoint to another is surprisingly clear:

# api_gateway/schema.ex
resolve(fn _, args, res ->
  AccountClient.run(
    """
    mutation($email: String!, $first_name: String!, $last_name: String!) {
      register_user(
        input: {
          email: $email
          first_name: $first_name
          last_name: $last_name
        }
      ) {
        errors {
          field
          message
        }
        user {
          id
          email
          first_name
          last_name
        }
      }
    }
    """,
    args       # HERE: Passing arguments
  )
  |> case do
       {:ok, %{"data" => %{"register_user" => register_user}}} ->
         {:ok, register_user}
       error ->
         error
     end
end)

Going back the reason we used a database instead of an Agent is to test concurrency errors specially when two instances of the same service receives the same request. Thanks to ecto, I am confident that this cannot happen because of the unique database index; however, what if the operation has no index like with the upcoming createTransaction mutation? Our message queue also has our back with this because when two or more clients subscribe to the same topic but all the same channel, only one gets it (see NSQ Design); however, messages can be still delivered many times (see NSQ Guarantees) so we are back where we started. Message deduplication is beyond the scope of this investigation. (ConduitPlugs.Deduplication is my quick stab at that.)

I have another concern but that is down the road and we can move on for now. If you do want to simulate the issue above, you can do the following:

# account_service/broker.ex
# If you want to have different channels for every run,
# replace channel with a random value
subscribe(
  :graphql_request,
  GraphQLRequest,
  topic: "account_graphql_request",
  channel: "#{@channel}-#{Enum.random(1..100)}"  # HERE: From @channel
)
# In one terminal
mix run --no-halt
# In another terminal
mix run --no-halt

# With docker
docker-compose up --scale account_service=2

>> Product and Transaction Service

What we want next is to implement the following query:

mutation($items: [CreateTransactionItem!]!) {
  createTransaction(
    input: {
      items: $items
    }
  ) {
    errors {
      field
      message
    }
    transaction {
      id
      code
      items {
        id
        productId
        price
        quantity
      }
    }
  }
}

The idea is that the user will buy a list of products from this API. To test that out, we need to create the product and transaction service. Given our template with account service, we can quickly copy and paste to produce both services. For products, it exposes the two queries:

query {
  products {
    id
    code
    name
    price
  }
  product($id: ID!) {
    ...
  }
}

The first one is self-evident like with users query, the second one is where we make it interesting. The createTransaction accepts a list of product id and corresponding quantity. For each product id, the transaction service will invoke the product query to fetch the current price. Not the most efficient which is another issue down the road but it does work.

resolve(fn _, args, res ->
  TransactionClient.run(
    """
    mutation($items: [CreateTransactionItem!]!) {
      create_transaction(
        input: {
          items: $items
        }
      ) {
        errors {
          field
          message
        }
        transaction {
          id
          code
          items {
            id
            product_id
            price
            quantity
          }
        }
      }
    }
    """,
    args,
    res.context     # HERE: Context based here
  )
  |> case do
       {:ok, %{"data" => %{"create_transaction" => create_transaction}}} ->
         {:ok, create_transaction}

       error ->
         error
     end
end)

For the astute reader, the user_id is nowhere to be seen so how will the transaction know which user is assigned? It comes from the OAauth header Authorization like with guardian but uses the user email instead:

# api_gateway/context.ex
defmodule ApiGatewayWeb.Context do
  @behaviour Plug

  alias Plug.Conn

  def init(opts), do: opts

  def call(conn, _) do
    context = build_context(conn)

    Absinthe.Plug.put_options(conn, context: context)
  end

  def build_context(conn) do
    %{
      current_user: get_current_user(conn)
    }
  end

  defp get_current_user(conn) do
    conn
    |> Conn.get_req_header("authorization")
    |> List.first()
    |> case do
      "Bearer " <> email ->
        AccountClient.get_by_email(email)

      _ ->
        nil
    end
  end
end

It just follows the context and authentication guide for a quick authentication plug. More importantly, the question is how does context passing occur over multiple services? In this case, the context is passed down the wire and read the same way; however, the better strategy is to make the user_id an explicit argument than a context variable. Nonetheless, it works like a charm but you need to handle string maps again:

# transaction_service/parse_context.ex
defmodule TransactionService.ParseContext do
  @behaviour Absinthe.Middleware

  def call(%{state: :unresolved} = res, _opts) do
    ctx = res.context

    id = get_in(ctx, ["current_user", "id"])

    %{res | context: %{current_user: %{id: id}}}
  end

  def call(res, _), do: res
end

To test this mutation in GraphiQL, you need to do the following things:

  1. Register a new user or pick a random user from users query and take note of the email($EMAIL)
  2. Add the header Authorization: Bearer $EMAIL which you can do via Headers > Add button
  3. Pick a random product and take note of its id ($PRODUCT_ID)
  4. Copy the source mutation above and use the payload via variables:
{
    "items": [
        {
            "productId": "$PRODUCT_ID",
            "quantity": 1
        }
    ]
}

With that we have one thing left to verify the experiment.

>> transactions Field

Once the user has made transactions, we will add a User.transactions field to fetch that:

query {
  users {
    id
    email
    transactions {
      id
      code
      items {
        product {
          id
          code
          name
          price
        }
        price
        quantity
      }
    }
  }
}

The interesting fact here is that the gateway now provides a custom field and fetches multiple services to provide that. This is not Apollo Federation but it is rather fun. Anyway to resolve User.transactions, we need the transaction service to return all transactions for a user via user_transaction query:

field :transactions, non_null(list_of(non_null(:user_transaction))) do
  description("User transactions")

  resolve(fn %{"id" => id}, _, res ->
    TransactionClient.run(
      """
      query($user_id: ID!) {
        user_transactions(user_id: $user_id) {
          id
          code
          items {
            id
            product_id
            price
            quantity
          }
        }
      }
      """,
      %{user_id: id},
      res.context
    )
    |> case do
         {:ok, %{"data" => %{"user_transactions" => user_transactions}}} ->
           {:ok, user_transactions}

         error ->
           error
       end
  end)
end

Lastly, we reuse the product query for product service to provide the TransactionItem.product field:

field :product, non_null(:product) do
  description("Transaction product")

  resolve(fn %{"product_id" => product_id}, _, res ->
    ProductClient.run(
      """
      query($id: ID!) {
        product(id: $id) {
          id
          code
          name
          price
        }
      }
      """,
      %{id: product_id},
      res.context
    )
    |> case do
         {:ok, %{"data" => %{"product" => product}}} ->
           {:ok, product}

         error ->
           error
       end
  end)
end

With all that, it now works although slow. Indeed, this is where Batch or Dataloader middleware come in and where each service must provide the supporting queries. In general, each communication increases latency and risk but natural under the microservice style.

Another issue I noticed with this approach is that multiple services are listening to the same GraphQL response topic. Remember that the gateway and transaction service are listening to the product response so if either service makes a GraphQL request, both receive the response but only one will find it useful. As this scales, so will the waste thus the service dependencies must be watched.

Now that all services are ready, we can the final touch.

>> GraphiQL

One of the nice things about GraphQL is GraphiQL which alongside the typed contract forces some form of documentation and experimentation. We should also put absinthe_plug via plug for each service to make it easier to access and introspect:

# account_service/router.ex
defmodule AccountService.Router do
  use Plug.Router

  plug Plug.RequestId
  plug Plug.Logger

  plug Plug.Parsers,
    parsers: [:urlencoded, :multipart, :json, Absinthe.Plug.Parser],
    pass: ["*/*"],
    json_decoder: Jason

  plug :match

  forward "/graphql",
    to: Absinthe.Plug,
    init_opts: [schema: AccountService.Schema]

  forward "/graphiql",
    to: Absinthe.Plug.GraphiQL,
    init_opts: [schema: AccountService.Schema, interface: :advanced]

  plug :dispatch
end

# account_service/application.ex
children = [
  {Plug.Cowboy, scheme: :http, plug: AccountService.Router, options: [port: 15000]},
  Broker,
  Repo
]

For each service, you might also nned the middleware fix because the GraphiQL UI cannot handle under_score convention or that it reverts back to being camelCase. Whichever the case, having a working GraphQL UI is wonderful. The ports for each service is listed below:

Now that we are done, what have we learned in this encounter?

>> Review

Providing a GraphQL interface over a message queue was fun. It was not as hard as I thought it would be initially, but now is the time to address some issues and lessons I learned while writing it.

First off, the workflow with using GraphQL is more focused on providing a good API schema which feels better than worrying about message queue topics or mechanics; however, writing the query strings are bulky and error prone in its own right. I can imagine writing multiple queries and either making a typo or forgetting to update old queries. Testing should catch this, but I am not aware of tooling like GraphQL fragments. I suppose this is expected as this is the same workflow with frontend engineers consuming GraphQL interfaces.

One of the lost GraphQL qualities during this experiment was that fields could be fetched on demand or query optimization. For example, when a user wants to only fetch their ids, the internal GraphQL query to account service fetches all fields. More strongly for the User.transactions query, it fetches all the way down. I tried doing some optimization myself using the definitions in the Absinthe.Resolution:

# product_service/schema.ex
resolve(fn %{"product_id" => product_id}, _, res ->
  ProductClient.run(
    """
    query($id: ID!) {
      product(id: $id) {
      #{to_field_query(res.definitions.selections)}
      }
    }
    """,
    %{id: product_id},
    res.context
  )
end)

However, this only works for flat fields; when nested fields are needed, the complexity spikes. Instead of handling it and dealing with edge cases such as missing ids or fields, I think its fine to pull all the fields since they are in an internal network. Perhaps one can find gold with the selections variable for query optimization?

One concern close to my heart is testing which I did not go into detail here since you need conduit experience is better in my opinion. The usual tests for each subscriber module is to check for side effects and whether messages where acked or not, now we the focus is satisfying that GraphQL contract.

Another concern is error handling. For example a query is outdated and is always producing a query error instead of an operation error, how does the backend cope? It is good to assume external communication can fail even if they are known assumptions. Still, the only option is to log it and hopefully catch it in development..

Finally the shadow of message deduplication and unwanted response recipients mean that this method may not be the best fit perhaps even for long term.

>> Summary

To summarize all the findings:

Pros

  • Schema Management

Managing schema and types is far more interesting than handling routes

  • GraphiQL Interface-

The GraphQL interface is a nice introspection tool.

  • Unified Request/Response

Only one topic to manage all the request/response messages.

  • GraphQL Testing

Testing GraphQL responses feel better than asserting message acknowledgment or side-effects from handlers.

Cons

  • Higher Latency

More messages means higher latency. Care must be exercised in making the gateway as fast as it can be.

  • Messages Deduplication

Mutations should consider messages being delivered at least once.

  • Unnecessary Subscribers

Since every GraphQL response endpoint has many subscribers, a response to one is broadcast to all making it garbage to all but one.

  • Query Bulk And Support

Queries occupy space and is just as error prone in editing.

  • Query Optimization

GraphQL allows select fields to be pulled but the backend must pull all the fields for safety.

To be safe, I am not vouching this for production as better techniques may already exist. The request/response module per message is still safe and efficient. If ever I would make an API gateway, I might consider revisiting this or pray that absinthe has better guidance or support at that time. Nonetheless, this was a fun excursion.