Building a simple service relay for Dynamics 365 CE with RabbitMQ and Python – part 3

综合编程 2018-02-06 阅读原文

In my lastpost in this series, I walked through the prerequisites for building a simple service relay for Dynamics 365 CE with RabbitMQ and Python. In today's post I will show the Python code to make the service relay work.

As I described in thefirst post in this series, this approach relies on a consumer process and a queue listener process that can both access a RabbitMQ message broker.

A consumer writes a request to a cloud-hosted RabbitMQ request queue (either directly or through a proxy service) and starts waiting for a response. On the other end, a Python script monitors the request queue for inbound requests. When it sees a new one, it executes the appropriate request through the Dynamics 365 Web API and writes the response back to a client-specific RabbitMQ response queue. The consumer then picks up the response from the queue.

This solution is based on the remote procedure call (RPC) approach shown here . The main difference is that I have added logic to the queue monitoring script to query the Dynamics 365 Web API based on the inbound request from the consumer.

Consumer sample

The consumer does the following:

  1. Read the text of the request to write to the queue from a command-line argument.
  2. Establish a connection to the RabbitMQ broker.
  3. Create a new anonymous, exclusive callback queue.
  4. Write a request message a queue called "rpc_queue." This message will include the callback queue as its "reply_to" property.
  5. Monitor the callback queue for a response.

There's no validation in this sample, so if you run it without a command-line argument, it will just throw an error and exit.

import sys
import pika
import uuid
import datetime

class CrmRpcClient(object):
    def __init__(self):
        #RabbitMQ connection details
        self.rabbituser = 'crmuser'
        self.rabbitpass = 'crmpass'
        self.rabbithost = '' 
        self.rabbitport = 5672
        self.rabbitqueue = 'rpc_queue'
        rabbitcredentials = pika.PlainCredentials(self.rabbituser, self.rabbitpass)
        rabbitparameters = pika.ConnectionParameters(host=self.rabbithost,

                self.rabbitconn = pika.BlockingConnection(rabbitparameters) =

        #create an anonymous exclusive callback queue
        result =
        self.callback_queue = result.method.queue, no_ack=True,

    #callback method for when a response is received - note the check for correlation id
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    #method to make the initial request
    def call(self, n):
        self.response = None
        #generate a new correlation id
        self.corr_id = str(uuid.uuid4())

        #publish the message to the rpc_queue - note the reply_to property is set to the callback queue from above'',
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
        while self.response is None:
        return self.response

#instantiate an rpc client
crm_rpc = CrmRpcClient()

#read the request from the command line
request = sys.argv[1]

#make the request and get the response
print(" [x] Requesting crm data("+request+")")
print(" [.] Start time %s" % str(
response =

#convert the response message body from the queue to a string 
decoderesponse = response.decode()

#print the output
print(" [.] Received response: %s" % decoderesponse)
print(" [.] End time %s" % str(

Queue listener sample

The queue listener does the following:

  1. Establish a connection to the RabbitMQ broker
  2. Monitor "rpc_queue" queue.
  3. When a new message from the "rpc_queue" queue is delivered, decode the message body as a string, and determine what Web API query to execute. Note: This sample can return a list of contacts or accounts from Dynamics 365 CE based on the request the consumer sends ("getcontacts" or "getaccounts"). If any other request is received, the listener will return an error message to the consumer callback queue.
  4. Execute the appropriate query against the Dynamics 365 Web API and write the response to the callback queue the client established originally.
import pika
import requests
from requests_ntlm import HttpNtlmAuth
import json

#NTLM credentials to access on-prem Dynamics 365 Web API
username = 'DOMAIN\USERNAME'
userpassword = 'PASSWORD'

#full path to Web API
crmwebapi = ''

#RabbitMQ connection details
rabbituser = 'crmuser'
rabbitpass = 'crmpass'
rabbithost = '' 
rabbitport = 5672

#method to execute a Web API query based on the client request
def processquery(query):
    #set the Web API request headers
    crmrequestheaders = {
        'OData-MaxVersion': '4.0',
        'OData-Version': '4.0',
        'Accept': 'application/json',
        'Content-Type': 'application/json; charset=utf-8',
        'Prefer': 'odata.maxpagesize=500',
        'Prefer': 'odata.include-annotations=OData.Community.Display.V1.FormattedValue'

    #determine which Web API query to execute
    if query == 'getcontacts':
        crmwebapiquery = '/contacts?$select=fullname,contactid'
    elif query == 'getaccounts':
        crmwebapiquery = '/accounts?$select=name,accountid'
        #only handle 'getcontacts' or 'getaccounts' requests
        return 'Operation not supported'

    #execute the query
    crmres = requests.get(crmwebapi+crmwebapiquery, headers=crmrequestheaders,auth=HttpNtlmAuth(username,userpassword))
    #get the results json
    crmjson = crmres.json()

    #return the json
    return crmjson

#method to handle new inbound requests
def on_request(ch, method, props, body):
    #convert the message body from the queue to a string
    decodebody = body.decode('utf-8')

    #print the request
    print(" [.] Received request: '%s'" % decodebody)

    #process the request query
    response = processquery(decodebody)

    #publish the response back to 'reply-to' queue from the request message and set the correlation id
                     properties=pika.BasicProperties(correlation_id = 
                     body=str(response).encode(encoding="utf-8", errors="strict"))
    ch.basic_ack(delivery_tag = method.delivery_tag)

print(" [x] Awaiting RPC requests")

#connect to RabbitMQ broker
rabbitcredentials = pika.PlainCredentials(rabbituser, rabbitpass)
rabbitparameters = pika.ConnectionParameters(host=rabbithost,
rabbitconn = pika.BlockingConnection(rabbitparameters)
channel =

#declare the 'rpc_queue' queue

#set qos settings for the channel

#assign the 'on_request' method as a callback for when new messages delivered from the 'rpc_queue' queue
channel.basic_consume(on_request, queue='rpc_queue')

#start listening for requests

Trying it out

As I mentioned in my last post, I initially wrote my code to use a RabbitMQ broker running on my local PC, so that's why the connections in the samples show as the host. For a demo, I've spun up a copy of RabbitMQ in a Docker container in the cloud and updated my connection parameters accordingly, but I am still running my queue listener and consumer processes on my local PC.

When the listener first starts, it displays a simple status message.

Then I execute a "getcontacts" request from the consumer in a separate window.

From the timestamps before and after the request, you can see the round-trip time is less than .2 seconds, which includes two round trips between my local PC and the cloud-based RabbitMQ broker plus the actual query processing time in my local Dynamics 365 CE org.

Then I execute a "getaccounts" request.

This request was also fulfilled in less than .2 seconds.

Finally I execute an invalid request to show what the error response looks like.

You'll note the total time from request to response is only about .05 seconds less than the total time for the valid queries. That indicates most of the time used in these samples is being spent on the round trips between my local PC and the RabbitMQ broker, which is not surprising.

Meanwhile, the queue listener wrote a simple status update for every request it received. If I were using this in production, I would use more sophisticated logging.

Wrapping up

That's it for now. In my next (and final) post in this series, I will show how you can use Azure Functions to make a consumer service proxy so consuming applications don't have to access to your RabbitMQ broker directly, and I will also discuss some general thoughts on security and scalability for the service .

责编内容by:Alexander Development 【阅读原文】。感谢您的支持!


小白入门微服务(1) – RPC 初体验,python、nodejs互调... 概述 前言 什么是 RPC RPC 原理 常用 RPC 框架对比 thrift 基础 python、nodejs 互调 后记 前言 上一篇文章中,我们初步了解了什么是微服务,那么我们这次来体验一下微服务中是怎么通信的。 如果我的文章对你...
Python基础学习参考(七):字典和集合... 一.字典 字典跟列表一样是一组数据的集合。它的特点是什么呢? 特点一:字典具有键(key)和值(value),其中键必须是唯一的,不可重复的,即键必须可以哈希的。对于值没有要求。 特点二:字典是无序的。 1. 字典的定义: 字典通过“{ }”来定义的,并且里面的键和值形式...
你以为的延时队列真的是你以为的么 特点:定期轮训数据库,设置状态。 优点:实现简单 缺点:数据量过大时会消耗太多的IO资源,效率太低 复制代码 2.2 DelayQueue 特点: 无界、延迟、阻塞队列 a、BlockingQueue+PriorityQueue(堆排序)+Delayed b、DelayQueue中存...
《RabbitMQ Tutorial》译文 第 3 章 发布和订阅... 原文来自 RabbitMQ 英文官网 的 教程(3.Publish and Subscribe) ,其示例代码采用了 .NET C# 语言。 In the previous tutorial we created a work queue. The assumption behin...
A bite of Python Being easy to pick up and progress quickly towards developing larger and more complicated applications, Python is becoming increasingly ubiquitous in ...