技术控

    今日:114| 主题:49431
收藏本版 (1)
最新软件应用技术尽在掌握

[其他] Binary data over Phoenix sockets

[复制链接]
毁我爱他你真棒 发表于 2016-11-28 07:34:47
85 1

立即注册CoLaBug.com会员,免费获得投稿人的专业资料,享用更多功能,玩转个人品牌!

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
TL;DR

   How to send raw binary data through channels in the Phoenix framework and read the data on the client. We will be using the MessagePack format to deliver the payload, gzipping when it makes sense to do so.
  MessagePack you say? Well, what's wrong with JSON?

   Nothing! No, really! Except... it could be leaner. You see over here atStoiximan we are delivering lots and lots of data to our customers and so even minor gains from each payload will translate to lots of bytes saved in the end- both from our customers' mobile data plans and our server loads. Plus all the cool kids use raw bytes and we should too.
   You mentioned gzipping above. Everybody and their moms know that gzipped JSON data are smaller than their msgpacked+gzipped counterparts!

   True! But our payloads are usually small enough that gzipping does not make sense . So most of the time the data we send will only be msgpacked, and msgpack sizes are smaller (generally speaking) than json sizes. If your data do not fit this description though and your app is chunky rather than chatty, then by all means use JSON. That's right, this does mean less work for you!
  Anything else I should know before I do this?

   The techniques described here will only work with fairly modern browsers, i.e. those supporting websockets (which allow for raw byte sending and receiving) and TypedArrays, so IE 10 and above is what we're targeting here. If you need to support older browsers you will have to connect with the longpolling transport and send json data (possibly gzipped+base64'd -i.e. text data- but you should be taking the 4:3 base64 expansion into account before gzipping), doing some
    detective work   capability detection work to find out what's supported- none of which will be shown here to keep this post focused.
  OK, you got me somewhat interested and I'm not scared by the extra work, how do I do it?

   Go ahead and create a new folder named binary_data_over_phoenix_sockets/ , cd inside and create a sample project with mix phoenix.new . --no-ecto from the command line. Press Y when asked to fetch dependencies. Open up mix.exs and add msgpack-elixir as a dependency:
  [code]# file: mix.exs

defp deps do  
  [
    # ...other deps go here
    {:message_pack, "~> 0.2.0"}
  ]
end[/code]   Then run mix deps.get to download it.
   First thing we need to do now is create our own msgpack serializer that we'll use with phoenix transports, looking of course at the default WebSocketSerializer.ex implementation for inspiration. Create folder web/transports and add new file message_pack_serializer.ex
  [code]# file: web/transports/message_pack_serializer.ex

defmodule BinaryDataOverPhoenixSockets.Transports.MessagePackSerializer do  
  @moduledoc false

  @behaviour Phoenix.Transports.Serializer

  alias Phoenix.Socket.Reply
  alias Phoenix.Socket.Message
  alias Phoenix.Socket.Broadcast

  # only gzip data above 1K
  @gzip_threshold 1024

  def fastlane!(%Broadcast{} = msg) do
    {:socket_push, :binary, pack_data(%{
      topic: msg.topic,
      event: msg.event,
      payload: msg.payload
    })}
  end

  def encode!(%Reply{} = reply) do
    packed = pack_data(%{
      topic: reply.topic,
      event: "phx_reply",
      ref: reply.ref,
      payload: %{status: reply.status, response: reply.payload}
    })
    {:socket_push, :binary, packed}
  end

  def encode!(%Message{} = msg) do
    # We need to convert the Message struct into a plain map for MessagePack to work properly.
    # Alternatively we could have implemented the Enumerable behaviour. Pick your poison :)
    {:socket_push, :binary, pack_data(Map.from_struct msg)}
  end

  # messages received from the clients are still in json format;
  # for our use case clients are mostly passive listeners and made no sense
  # to optimize incoming traffic
  def decode!(message, _opts) do
    message
    |> Poison.decode!()
    |> Phoenix.Socket.Message.from_map!()
  end

  defp pack_data(data) do
    msgpacked = MessagePack.pack!(data, enable_string: true)
    gzip_data(msgpacked, byte_size(msgpacked))
  end

  defp gzip_data(data, size) when size < @gzip_threshold, do: data
  defp gzip_data(data, _size), do: :zlib.gzip(data)
end[/code]   Anyone who bothered to click on the default implementation link will immediately see that the code is essentially the same, except now we're using MessagePack to serialize the payload (duh) and telling phoenix that payloads are :binary not :text data (which relates to websocket frame handling).
   Now open the web/channels/user_socket.ex file and override the default serializer with our own
  [code]# file: web/channels/user_socket.ex
...
  transport :websocket, Phoenix.Transports.WebSocket,
    serializer: BinaryDataOverPhoenixSockets.Transports.MessagePackSerializer
...[/code]   We will also need a channel to do the actual talking, so go ahead and run in the command line mix phoenix.gen.channel Test which will create the file web/channels/test_channel.ex . Follow the suggestion and add this channel to user_socket.ex
  [code]# file: web/channels/user_socket.ex

# add this line above the transport
channel "test:lobby", BinaryDataOverPhoenixSockets.TestChannel[/code]   Note: depending on your phoenix version, you may need to run mix phoenix.gen.channel Test lobby instead.
   We should now add two callbacks that return some sample responses to showcase both plain and gzipped payload delivery. Open up test_channel.ex and add the following lines inside the TestChannel module:
  [code]# file: web/channels/test_channel.ex

def handle_in("small", _payload, socket) do  
  push socket, "small_reply", %{"small response that will only be msgpacked" => true}
  {:noreply, socket}
end

def handle_in("large", _payload, socket) do  
  push socket, "large_reply", %{"large response that will be msgpacked+gzipped" =>  1..1000 |> Enum.map(fn _ -> 1000 end) |> Enum.into([])}
  {:noreply, socket}
end[/code]   And that wraps up the server portion of things! Run mix phoenix.server to fire up the app, open your modern browser and navigate to http://localhost:4000
  The client

  Time to get crackin' on the client. First things first: we need to gunzip data sent from the server (if gzipped), and we need to unpack the msgpacked message. Unfortunately there is no native browser support for any of these things, so we'll need to get a little creative.
  Unzipping:

   We will be using a stripped down version of imaya's zlib.js , since we only need the unzipping part. Go ahead and create the file gunzip.js under web/static/js/vendor/ , adding the contents from here .
  [code]// file: web/static/js/vendor/gunzip.js

// Paste in here the contents from https://github.com/StoiximanServices/blog/blob/master/binary_data_over_phoenix_sockets/web/static/js/vendor/gunzip.js[/code]  Unpacking:

   Again we only need to unpack messages on the client, no need for packing. Searching around did not produce anything that fit our needs, so we get to roll our own MessagePack decoder- yay! Don't worry, the MessagePack spec is pretty straight forward -no need for lookahead/lookbehind- so modeling the decoder with a state machine should not pose too much of a problem (not to mention that this should be fast as well!)
   Create file web/static/js/msgpack.js and put the following inside:
  [code]// file: web/static/msgpack.js

let formats = {  
  positiveFixIntStart: 0x00,
  positiveFixIntEnd: 0x07F,
  fixMapStart: 0x80,
  fixMapEnd: 0x8F,
  fixArrStart: 0x90,
  fixArrEnd: 0x9F,
  fixStrStart: 0xA0,
  fixStrEnd: 0xBF,
  nil: 0xC0,
  none: 0xC1,
  bFalse: 0xC2,
  bTrue: 0xC3,
  bin8: 0xC4,
  bin16: 0xC5,
  bin32: 0xC6,
  ext8: 0xC7,
  ext16: 0xC8,
  ext32: 0xC9,
  float32: 0xCA,
  float64: 0xCB,
  uint8: 0xCC,
  uint16: 0xCD,
  uint32: 0xCE,
  uint64: 0xCF,
  int8: 0xD0,
  int16: 0xD1,
  int32: 0xD2,
  int64: 0xD3,
  fixExt1: 0xD4,
  fixExt2: 0xD5,
  fixExt4: 0xD6,
  fixExt8: 0xD7,
  fixExt16: 0xD8,
  str8: 0xD9,
  str16: 0xDA,
  str32: 0xDB,
  array16: 0xDC,
  array32: 0xDD,
  map16: 0xDE,
  map32: 0xDF,
  negativeFixIntStart: 0xE0,
  negativeFixIntEnd: 0xFF
}

/*
Decode returns two element [pos, data] arrays: index 0 holds the new position of the parser, and index 1 contains the parsed data. We carry around the original  
binary data array to avoid copying to new slices, while updating the parser position and recursively calling decode until we've consumed all the buffer.  
Missing from this implementation is extension support- add it if you need it.  
*/
let decode = function(binaryData, start){

  start = start || 0;
  let format = binaryData[start];

  if(format <= formats.positiveFixIntEnd){
    return [start + 1, format - formats.positiveFixIntStart];
  }
  if(format <= formats.fixMapEnd){
    let keyCount = format - formats.fixMapStart;
    return parseMap(binaryData, keyCount, start + 1);
  }
  if(format <= formats.fixArrEnd){
    let len = format - formats.fixArrStart;
    return parseArray(binaryData, len, start + 1);
  }
  if(format <= formats.fixStrEnd){
    let len = format - formats.fixStrStart;
    return parseUtf8String(binaryData, len, start + 1);
  }

  let pos, len;

  switch(format){
    case formats.nil:
      return [start + 1, null];
    case formats.bFalse:
      return [start + 1, false];
    case formats.bTrue:
      return [start + 1, true];
    case formats.bin8:
      [pos, len] = parseUint(binaryData, 8, start + 1)
      return parseBinaryArray(binaryData, len, pos);
    case formats.bin16:
      [pos, len] = parseUint(binaryData, 16, start + 1)
      return parseBinaryArray(binaryData, len, pos);
    case formats.bin32:
      [pos, len] = parseUint(binaryData, 32, start + 1)
      return parseBinaryArray(binaryData, len, pos);
    case formats.float32:
      return parseFloat(binaryData, 32, start + 1);
    case formats.float64:
      return parseFloat(binaryData, 64, start + 1);
    case formats.uint8:
      return parseUint(binaryData, 8, start + 1);
    case formats.uint16:
      return parseUint(binaryData, 16, start + 1);
    case formats.uint32:
      return parseUint(binaryData, 32, start + 1);
    case formats.uint64:
      return parseUint(binaryData, 64, start + 1);
    case formats.int8:
      return parseInt(binaryData, 8, start + 1);
    case formats.int16:
      return parseInt(binaryData, 16, start + 1);
    case formats.int32:
      return parseInt(binaryData, 32, start + 1);
    case formats.int64:
      return parseInt(binaryData, 64, start + 1);
    case formats.str8:
      [pos, len] = parseUint(binaryData, 8, start + 1);
      return parseUtf8String(binaryData, len, pos);
    case formats.str16:
      [pos, len] = parseUint(binaryData, 16, start + 1);
      return parseUtf8String(binaryData, len, pos);
    case formats.str32:
      [pos, len] = parseUint(binaryData, 32, start + 1);
      return parseUtf8String(binaryData, len, pos);
    case formats.array16:
      [pos, len] = parseUint(binaryData, 16, start + 1);
      return parseArray(binaryData, len, pos);
    case formats.array32:
      [pos, len] = parseUint(binaryData, 32, start + 1);
      return parseArray(binaryData, len, pos);
    case formats.map16:
      [pos, len] = parseUint(binaryData, 16, start + 1);
      return parseMap(binaryData, len, pos);
    case formats.map32:
      [pos, len] = parseUint(binaryData, 32, start + 1);
      return parseMap(binaryData, len, pos);
  }

  if(format >= formats.negativeFixIntStart && format <= formats.negativeFixIntEnd){
    return [start + 1, - (formats.negativeFixIntEnd - format + 1)]
  }

  throw new Error("I don't know how to decode format ["+format+"]");
}

function parseMap(binaryData, keyCount, start){  
  let ret = {};
  let pos = start;
  for(let i = 0; i < keyCount; i++){
    let [keypos, key] = decode(binaryData, pos);
    pos = keypos;
    let [valpos, value] = decode(binaryData, pos)
    pos = valpos;
    ret[key] = value;
  }
  return [pos, ret];
}

function parseArray(binaryData, length, start){  
  let ret = [];
  let pos = start;
  for(let i = 0; i < length; i++){
    let [newpos, data] = decode(binaryData, pos)
    pos = newpos;
    ret.push(data);
  }
  return [pos, ret];
}

function parseUint(binaryData, length, start){  
  let num = 0;
  let pos = start;
  let count = length;
  while (count > 0){
    count-= 8;
    num += binaryData[pos] << count
    pos++;
  }
  return [pos, num];
}

function parseInt(binaryData, length, start){  
  let [pos, unum] = parseUint(binaryData, length, start);
  let s = 64 - length;
  //https://github.com/inexorabletash/polyfill/blob/master/typedarray.js
  return [pos, (unum << s) >> s];
}

function parseBinaryArray(binaryData, length, start){  
  let m = binaryData.subarray || binaryData.slice;
  let pos = start + length;
  return [pos, m.call(binaryData, start, pos)];
}

function parseFloat(binaryData, length, start){  
  let bytecount = length / 8;
  let view = new DataView(new ArrayBuffer(length));
  for(let i = start; i < bytecount; i++){
    view.setUint8(i-start, binaryData);
  }
  let fnName = "getFloat"+length;
  let result = view[fnName](0, false);
  return [start + bytecount, result]
}

function parseUtf8String(data, length, start){  
  //from https://gist.github.com/boushley/5471599
  var result = [];
  var i = start;
  var c = 0;
  var c1 = 0;
  var c2 = 0;

  // If we have a BOM skip it
  if (length >= 3 && data === 0xef && data[i+1] === 0xbb && data[i+2] === 0xbf) {
    i += 3;
  }

  let mark = length + start;
  while (i < mark) {
    c = data;
    if (c < 128) {
      result.push(String.fromCharCode(c));
      i++;
    } else if (c > 191 && c < 224) {
      if( i+1 >= data.length ) {
        throw "UTF-8 Decode failed. Two byte character was truncated.";
      }
      c2 = data[i+1];
      result.push(String.fromCharCode( ((c&31)<<6) | (c2&63) ));
      i += 2;
    } else {
      if (i+2 >= data.length) {
        throw "UTF-8 Decode failed. Multi byte character was truncated.";
      }
      c2 = data[i+1];
      c3 = data[i+2];
      result.push(String.fromCharCode( ((c&15)<<12) | ((c2&63)<<6) | (c3&63) ));
      i += 3;
    }
  }
  return [mark, result.join('')];
}

let msgpack = {  
  decode: function(binaryArray){
    return decode(binaryArray)[1];
  }
}

export default msgpack[/code]  Phew! That was a mouthful. We're now ready to intercept the socket's incoming messages and use our decoder for parsing the data.
   Again, we're going to look to the default implementation for inspiration.
   We'll start with abstracting the msgpack parsing into it's own module for reusability and then use it inside socket.js . So start by creating new file web/static/js/binarySocket.js and
  add the following:
  [code]//  file: web/static/js/binarySocket.js

import msgpack from "./msgpack"

/*lots of console.log() statements for educational purposes in this file, don't forget to remove them in production*/

function convertToBinary(socket){

  let parentOnConnOpen = socket.onConnOpen;

  socket.onConnOpen = function(){
    //setting this to arraybuffer will help us not having to deal with blobs
    this.conn.binaryType = 'arraybuffer';
    parentOnConnOpen.apply(this, arguments);
  }

  //we also need to override the onConnMessage function, where we'll be checking
  //for binary data, and delegate to the default implementation if it's not what we expected
  let parentOnConnMessage = socket.onConnMessage;

  socket.onConnMessage = function (rawMessage){
    if(!(rawMessage.data instanceof window.ArrayBuffer)){
      return parentOnConnMessage.apply(this, arguments);
    }
    let msg = decodeMessage(rawMessage.data);
    let topic = msg.topic;
    let event = msg.event;
    let payload = msg.payload;
    let ref = msg.ref;

    this.log("receive", (payload.status || "") + " " + topic + " " + event + " " + (ref && "(" + ref + ")" || ""), payload);
    this.channels.filter(function (channel) {
      return channel.isMember(topic);
    }).forEach(function (channel) {
      return channel.trigger(event, payload, ref);
    });
    this.stateChangeCallbacks.message.forEach(function (callback) {
      return callback(msg);
    });
  }

  return socket;
}

function decodeMessage(rawdata){  
  if(!rawdata){
    return;
  }

  let binary = new Uint8Array(rawdata);
  let data;
  //check for gzip magic bytes
  if(binary.length > 2 && binary[0] === 0x1F && binary[1] === 0x8B){
    let inflate = new window.Zlib.Gunzip(binary);
    data = inflate.decompress();
    console.log('received', binary.length, 'Bytes of gzipped data,', data.length, 'Bytes after inflating');
  }
  else{
    console.log('received', binary.length, 'Bytes of plain msgpacked data');
    data = binary;
  }
  let msg = msgpack.decode(data);
  return msg;
}

export default {  
  convertToBinary
}[/code]   Then let's head over to web/static/js/socket.js and replace all contents with the following
  [code]// file: web/static/js/socket.js

import {Socket} from "phoenix"  
import binarySocket from "./binarySocket"

/*the type=msgpack param is only added to distinguish this connection
from the phoenix live reload connection in the browser's network tab*/  
let socket = new Socket("/socket", {params: {type: "msgpack"}})

socket = binarySocket.convertToBinary(socket);

socket.connect()

//lets join the lobby
let channel = socket.channel("test:lobby", {})

channel.on("small_reply", function(data){  
  console.log("small reply: server responded with", data);
})

channel.on("large_reply", function(data){  
  console.log("large reply: server responded with", data);
})

channel.join()  
  .receive("ok", resp => {
    console.log("Joined successfully", resp)
    channel.push("small")
    channel.push("large")
  })
  .receive("error", resp => { console.log("Unable to join", resp) })

export default socket[/code]   Lastly we'll need to import socket.js contents into our app.js file. Open up web/static/js/app.js and uncomment the following line:
  [code]// file: web/static/js/app.js

import socket from "./socket"[/code]   Your browser should have refreshed quite a lot of times by now; let's open the dev tools (F12), head over to the Network tab, switch to websocket frames debugging while keeping the console window open and then refresh your page once more. Make sure you're debugging the type=msgpack websocket connection (the other one is for phoenix's own live reload feature).
  What you should be seeing if everything worked is the binary frames sent from the server, the text (json) frames sent from the client (remember that all the work we did was for the server to be able to send msgpacked data), and the messages in the console happily notifying us everytime a response is received (heartbeats for the most part).
   
Binary data over Phoenix sockets-1 (framework,customers,channels,Phoenix,deliver)

  That's all folks! We hope you've enjoyed the post and we wish you lots of happy elixir coding!
   P.S. All code and tests are of course on github .
友荐云推荐




上一篇:[原]Android WebView加载Chromium动态库的过程分析
下一篇:S3 static site with SSL and automatic deploys using Travis
酷辣虫提示酷辣虫禁止发表任何与中华人民共和国法律有抵触的内容!所有内容由用户发布,并不代表酷辣虫的观点,酷辣虫无法对用户发布内容真实性提供任何的保证,请自行验证并承担风险与后果。如您有版权、违规等问题,请通过"联系我们"或"违规举报"告知我们处理。

caothe89 发表于 前天 00:36
我也顶起出售广告位
回复 支持 反对

使用道具 举报

*滑动验证:
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

我要投稿

推荐阅读

扫码访问 @iTTTTT瑞翔 的微博
回页顶回复上一篇下一篇回列表手机版
手机版/CoLaBug.com ( 粤ICP备05003221号 | 文网文[2010]257号 )|网站地图 酷辣虫

© 2001-2016 Comsenz Inc. Design: Dean. DiscuzFans.

返回顶部 返回列表