Building the soul of a new Transporter adaptor

Building on the previous installment of this series, we now take the first steps to creating our own custom adapter for the Compose Transporter.

In thefirst installment of this series, we dissected Transporter’s simple File adaptor
to gain some understanding of how an adaptor is put together. Building on our findings, the aim here is to write the simplest possible adaptor for the IBM Cloudant
database. It will consist of an “all in one” source, and it will have a sink which writes one message at a time, making no attempts at buffering.

Setup

To make the most of this, it helps if you are reasonably familiar with Go development
. I’m using

$ go version
go version go1.9.2 darwin/amd64

When you come to write your own adaptor, the first thing you need to do is to fork the Compose Transporter github repository.

Log in to your own github account and visit https://github.com/compose/transporter
and hit the fork button, top right.

The plan here is to resort to some modest git trickery:

  1. Install the original compose repo (not our fork!)
  2. Create a branch locally in this
  3. Set up our fork as the push remote

This means that as far as Go’s concerned, we’ve installed the official version, but as we make changes we can commit and push our work up to our own fork.

So here goes:

$ go get github.com/compose/transporter
$ cd $GOPATH/src/github.com/compose/transporter
$ git checkout -b cloudant-simple
$ Switched to a new branch 'cloudant-simple'
$ git remote set-url --push origin GITHUB-USERNAME@github.com:transporter.git

The last step can also be done via HTTPS if that’s your thing:

$ git remote set-url --push origin https://github.com/GITHUB-USERNAME/transporter

Adaptors, as we saw in theprevious article, live under transporter/adaptor
, so let’s create a home for our work:

$ mkdir adaptor/cloudant

We should now be able to push that new directory:

git add adaptor/cloudant  
git commit -m 'Initial code drop for our simple Cloudant adaptor.'  
git push origin cloudant-simple

We should be good to go if that all worked.

The go-cloudant library

Talking to a Cloudant database over HTTP is quite simple, so we could rely on Go’s own HTTP libraries and talk directly to the database API. However, if you have a poke around the adaptors already shipping with Transporter, you’ll notice that they tend to rely on existing database access client libraries for the heavy lifting. This is a good idea for several reasons, but mostly because it keeps adaptors simple.

Cloudant provides officially supported libraries
for several languages, but unfortunately Go isn’t one of them, so we need to look elsewhere. For this exercise, we’ll be using go-cloudant
, which is open source and under active development in the community.

Let’s install the go-cloudant
library:

go get github.com/cloudant-labs/go-cloudant

Client

Our first step is to implement the Client
. Recall from theprevious article that the Client implements the Client
interface, and its job is to represent the underlying database and to be able to provide a Session
–an authenticated, active connection. When we looked at the File
adaptor, there wasn’t much to do for this, but our new adaptor will need to do a bit more. Here are the main parts of the file adaptor/cloudant/client.go
:

package cloudant

import (  
    "net/url"

    cdt "github.com/cloudant-labs/go-cloudant"
    "github.com/compose/transporter/client"
)

const (  
    // DefaultURI is the default endpoint for CouchDB on the local machine.
    DefaultURI = "cloudant://127.0.0.1:5984"
)

var (  
    _ client.Client = &Client{}
    _ client.Closer = &Client{}
)

This declaration
is just an assertion that our Client
struct does indeed implement the two interfaces client.Client
and client.Closer
. Unlike other languages, Go doesn’t have an implements
construct.

We then move on to defining some structures for holding the client data and session data:

// Client creates and holds the session to Cloudant
type Client struct {  
    client   *cdt.CouchClient
    database *cdt.Database
    dbName   string
    uri      string
    username string
    password string
}

// Session wraps the access points for consumption by Reader and Writer
type Session struct {  
    client   *cdt.CouchClient
    dbName   string
    database *cdt.Database
}

// ClientOptionFunc is a function that configures a Client.
type ClientOptionFunc func(*Client) error

What isn’t showing here are the various ClientOptionFuncs
–they’re just used to set the fields in the Client
struct. Check them out in the GitHub repo, to see what they look like.

Next is the constructor, NewClient()
:

// NewClient creates a Cloudant client
func NewClient(options ...ClientOptionFunc) (*Client, error) {  
    c := &Client{uri: DefaultURI}

    for _, option := range options {
        if err := option(c); err != nil {
            return nil, err
        }
    }

    if err := c.initConnection(); err != nil {
        return nil, err
    }

    return c, nil
}

This handles all the client option settings and then calls initConnection()
to test the connection, which we’ll get to in a moment.

Next is Connect()
which returns a Session
to the framework and a matching Close()
function to close it down. The Session
struct is consumed by the Write and Read functions in the adaptor.

// Connect wraps the underlying session to the Cloudant database
func (c *Client) Connect() (client.Session, error) {  
    return &Session{
        client:   c.client,
        dbName:   c.dbName,
        database: c.database,
    }, nil
}

// Close fulfills the Closer interface and takes care of cleaning up the session
func (c Client) Close() {  
    if c.database != nil {
        c.client.LogOut()
        c.client.Stop()
    }
}

Now we reach the initConnection()
function where all the connection to Cloudant takes place:

func (c *Client) initConnection() error {  
    uri, _ := url.Parse(c.uri)

    // Clean up the URI so we can use it as the input to the Cloudant client
    if c.uri == DefaultURI {
        uri.Scheme = "http" // Local CouchDB
    } else {
        uri.Scheme = "https"
    }
    uri.User = nil

    // Authenticate
    cl, err := cdt.CreateClient(c.username, c.password, uri.String(), 5)
    if err != nil {
        return client.ConnectError{Reason: err.Error()}
    }
    c.client = cl

    // Select the database, creating it if it's not there: both source
    // and sink use this.
    database, err := cl.GetOrCreate(c.dbName)
    if err != nil {
        return client.ConnectError{Reason: err.Error()}
    }
    c.database = database

    return nil
}

This code calls CreateClient()
from the go-cloudant
library, which opens the connection, and then GetOrCreate()
to select the database we’re working with.

It needs to be able to create a new database as this is the expected behavior for a sink. The method GetOrCreate()
handles this, but note that due to Cloudant’s permissions model, this requires account-level credentials, and can’t be achieved using an API key.

Adaptor

The next step is the adaptor entry point. Again, recall from the previous installment that we need to hook into the Transporter machinery that lets it create one of our new adaptors from a JavaScript object representation holding any configuration parameters required for us to create a Client
.

It’s found in adaptor/cloudant/cloudant.go
, and consists mainly of boilerplate:

package cloudant

import (  
    "sync"

    "github.com/compose/transporter/adaptor"
    "github.com/compose/transporter/client"
)

const (  
    sampleConfig = `{
        "uri": "${CLOUDANT_URI}",
        "username": "username",
        "password": "password",
        "database": "database",
        // Note: all cloudant URIs must be "cloudant://..."
}`

    description = "a Cloudant adaptor that functions as both source and sink"
)

In this opening pre-amble, we import the essential Go code and then define a sample configuration. This gets used by the Transporter with its init
command to create code.

// cloudant implements adaptor.Adaptor
var _ adaptor.Adaptor = &cloudant{}

// Cloudant is an adaptor that reads and writes records to Cloudant databases
type cloudant struct {  
    adaptor.BaseConfig
    Database string `json:"database"`
    Username string `json:"username"`
    Password string `json:"password"`
    cl       *Client
}

Now we define the parameters for the adaptor: the default Adaptor parameters, three of our own parameters for the database name, username and password and a hook to hang our Client connection on.

func init() {  
    adaptor.Add(
        "cloudant",
        func() adaptor.Adaptor {
            return &cloudant{}
        },
    )
}

The init
function adds the Cloudant adapter to the list of available adapters in Transporter. There’s no setup of the connection done here. That happens in the next function.

func (c *cloudant) Client() (client.Client, error) {  
    cl, err := NewClient(
        WithURI(c.URI),
        WithDatabase(c.Database),
        WithUser(c.Username),
        WithPassword(c.Password),
    )
    if err != nil {
        return nil, err
    }
    c.cl = cl

    return cl, nil
}

This calls the NewClient()
constructor we defined in the client with our parameters.

func (c *cloudant) Reader() (client.Reader, error) {  
    return newReader(), nil
}

func (c *cloudant) Writer(done chan struct{}, wg *sync.WaitGroup) (client.Writer, error) {  
    return newWriter(), nil
}

The Reader
and Writer
hook up the source and sink through a common interface. We’ll get to writing them next, but first to complete our Adaptor:

// Description for Cloudant adaptor
func (c *cloudant) Description() string {  
    return description
}

// SampleConfig for Cloudant adaptor
func (c *cloudant) SampleConfig() string {  
    return sampleConfig
}

These complete the Adaptor API, allowing the Transporter to access that description and sample config we set up earlier. This bit of code will be pretty much boilerplate for any adaptor we write. The difference comes in the implementation of the Source and the Sink.

Source

The source needs to implement the client.Reader
interface. Our opening code mostly sets up that implementation. The only difference is we’re importing the go-cloudant
package:

package cloudant

// The Cloudant Reader uses the Changes() functionality in the Cloudant
// client library.

import (  
    "strings"

    cdt "github.com/cloudant-labs/go-cloudant"
    "github.com/compose/transporter/client"
    "github.com/compose/transporter/message"
    "github.com/compose/transporter/message/ops"
)

var _ client.Reader = &Reader{}

// Reader implements client.Reader
type Reader struct{}

func newReader() client.Reader {  
    return &Reader{}
}

In terms of the implementing the Read function, it should read all the records, and on request provide them in sequence, converted to Transporter Messages. For Cloudant, we’ll read the database’s changes feed.

// Read fulfils the Reader interface.
func (r *Reader) Read(_ map[string]client.MessageSet, filterFn client.NsFilterFunc) client.MessageChanFunc {  
    return func(s client.Session, done chan struct{}) (chan client.MessageSet, error) {
        out := make(chan client.MessageSet)
        session := s.(*Session)

        query := cdt.NewChangesQuery().
            IncludeDocs().
            Build()

        changes, err := session.database.Changes(query)
        if err != nil {
            return nil, err
        }

        go func() {
            defer close(out)

            for {
                select {
                case <-done:
                    return
                case change, more := <-changes:
                    if !more {
                        return
                    }
                    if change != nil && filterFn(session.dbName) {
                        out <- client.MessageSet{Msg: makeMessage(change, session.dbName)}
                    }
                }
            }
        }()

        return out, nil
    }
}

We can rely on the changes feed reader that’s provided by the go-cloudant
library. It has lots of tricks up its sleeve, but in this instance, we’re using it in the most direct possible way: it’s reading the complete changes feed in one batch, but trickling back the documents over the changes
channel. For our purposes, this is just fine, but for production use we’d want to change this to use a continuous feed instead, allowing us to stream documents and follow an evolving source.

func makeMessage(change *cdt.Change, db string) message.Msg {  
    if change.Deleted {
        return message.From(ops.Delete, db, change.Doc)
    }

    if strings.HasPrefix(change.Rev, "1-") {
        return message.From(ops.Insert, db, change.Doc)
    }

    return message.From(ops.Update, db, change.Doc)
}

The makeMessage()
function tries to classify each changes feed event into inserts, deletes and updates. In Cloudant, this isn’t always obvious, as it treats all document modifications (including deletions) as inserts. We’re saying that if a revision id has the prefix 1-
it’s an insert. If you’re interested in learning more how this hangs together, check out the Cloudant documentation
.

The Cloudant changes feed is only partially ordered–there is no guarantee that the order in which you see events on the changes feed correspond to any defined ordering in terms of how the documents were modified on the cluster.

Sink

The sink implementation is found in adaptor/cloudant/writer.go
. If we look at the File adaptor’s sink
, it seems tempting to just write our sink like so:

// NOTE: INCORRECT
func (w *Writer) Write(msg message.Msg) func(client.Session) (message.Msg, error) {  
    return func(s client.Session) (message.Msg, error) {
        _, err := s.(*Session).database.Set(msg.Data())
        return msg, err
    }
}

but sadly that’s too good to be true. The File
sink can only insert new data, not update or delete. Here’s the sink; note it starts with a similar preamble as the Reader:

package cloudant

// The Cloudant Writer implementation

import (  
    "fmt"

    cdt "github.com/cloudant-labs/go-cloudant"
    "github.com/compose/transporter/client"
    "github.com/compose/transporter/message"
    "github.com/compose/transporter/message/data"
    "github.com/compose/transporter/message/ops"
)

var _ client.Writer = &Writer{}

// Writer implements client.Writer
type Writer struct{}

func newWriter() *Writer {  
    return &Writer{}
}

In our Write function we switch on the message operation type, and call dedicated helper functions insertDoc()
, updateDoc()
and deleteDoc()
accordingly. Those helper functions need to do a little bit of house-keeping. In order to update or delete a Cloudant document, the document must contain both of the _id
and _rev
fields. This is core to the Cloudant MVCC
(Multi-Version Concurrency Control) system. If we get a delete or update operation where the data does not contain either of these fields, the sink will limp on with an error.

// Write inserts, updates or deletes a message
func (w *Writer) Write(msg message.Msg) func(client.Session) (message.Msg, error) {  
    return func(s client.Session) (message.Msg, error) {
        db := s.(*Session).database
        var err error
        switch msg.OP() {
        case ops.Delete:
            err = deleteDoc(db, msg.Data())
        case ops.Update:
            err = updateDoc(db, msg.Data())
        default:
            err = insertDoc(db, msg.Data())
        }

        if err == nil && msg.Confirms() != nil {
            msg.Confirms() <- struct{}{}
        }

        return msg, err
    }
}

This calls the appropriate action for each message it processes. The simplest of those is the insertDoc()
as it has no checks to perform:

func insertDoc(database *cdt.Database, doc data.Data) error {  
    _, err := database.Set(doc)
    return err
}

The updateDoc()
and deleteDoc()
functions, on the other hand, have to check for the presence of the _id
and _rev
fields:

func updateDoc(database *cdt.Database, doc data.Data) error {  
    _, hasID := doc.Has("_id")
    _, hasRev := doc.Has("_rev")
    if hasID && hasRev {
        return insertDoc(database, doc)
    }

    return fmt.Errorf("Document needs both _id and _rev to update")
}

func deleteDoc(database *cdt.Database, doc data.Data) error {  
    _, hasID := doc.Has("_id")
    _, hasRev := doc.Has("_rev")
    if hasID && hasRev {
        err := database.Delete(stringField("_id", doc), stringField("_rev", doc))
        if err != nil {
            return err
        }
    }
    return fmt.Errorf("Document needs both _id and _rev to delete")
}

Finally, there’s a utility function, stringField()
for safely extracting data from the document data and coercing the type to a string.

func stringField(key string, doc data.Data) string {  
    switch val := doc.Get(key).(type) {
    case string:
        return val
    default:
        return fmt.Sprintf("%v", val)
    }
}

Registry

The final thing we need to do before we can start testing this is to enter the new adaptor into the adaptor “registry”, in the file adaptor/all/all.go
, like so:

package all

import (  
    // Initialize all adapters by importing this package
    _ "github.com/compose/transporter/adaptor/cloudant"
    _ "github.com/compose/transporter/adaptor/elasticsearch"
    _ "github.com/compose/transporter/adaptor/file"
    _ "github.com/compose/transporter/adaptor/mongodb"
    _ "github.com/compose/transporter/adaptor/postgres"
    _ "github.com/compose/transporter/adaptor/rabbitmq"
    _ "github.com/compose/transporter/adaptor/rethinkdb"
)

Unit tests

Tests are only required if your code contains bugs, right? Wrong. Go’s approach to unit testing is blissfully lightweight. Let’s create some tests so that we can refactor with impunity. We’ll thank ourselves later when we come to extend this work for part 3 of this article series.

We have three components through which data will flow:

  1. The Cloudant database itself
  2. The go-cloudant
    client library
  3. Our new Transporter adaptor

Whilst we don’t need to test the functionality of 1 and 2, we do need to test the end-to-end connectivity. Ultimately, we want to test that our source adaptor can read data from the underlying database and that the sink adaptor can write data to it. It may be possible to mock out the database itself, but for tutorial purposes, mocked tests are harder to understand. Instead, we’re going to run our unit tests against a local, single-node CouchDB instance in a docker container.

CouchDB
has the same API as Cloudant, so for our testing purposes they’re close enough, even with the old version we’ll be using. To run CouchDB in docker, you obviously need docker installed
.

Run the following commands:

docker run -d -p 5984:5984 --rm --name couchdb couchdb:1.6  
export HOST="http://127.0.0.1:5984"  
curl -XPUT $HOST/_config/admins/admin -d '"xyzzy"'  
curl -XPUT $HOST/testdb -u admin

The first line runs CouchDB 1.6 in docker, exposing port 5984, which is the standard CouchDB port. The second command saves us some keystrokes. The third command disables CouchDB’s admin party
mode by creating an admin user, and the last command creates a database called testdb
in the admin
account. We should now be able to let our tests interact with the local CouchDB as user admin
and the password xyzzy
. The go-cloudant
library does not allow non-authenticated ( admin party
) connections.

The tests contain more lines of code than the adaptor itself, which isn’t unusual. We won’t examine them all, but instead, let’s look at the principles they share. The source tests are found in adaptor/cloudant/reader_test.go
. It does the following:

  1. Set up a back channel connection to the database — this is just a go-cloudant:CreateClient()
    that we can use to poke test data into the database without using Transporter.
  2. Create a Cloudant adaptor and Connect()
    it to the database.
  3. Use the back channel to store test data in the database.
  4. Call the adaptor’s Read()
    method, which gives us a reader function.
  5. Drain the message channel we get back from calling the reader function with the Session()
  6. If the number of messages on the channel equals the number of test documents, the test is successful.

The last four steps of that look like this:

// Several setup steps skipped for clarity
    MakeDocs(uploader, docs)
    readFunc := rd.Read(map[string]client.MessageSet{}, func(ns string) bool {
        return true
    })

    changes, err := readFunc(s, done)
    if err != nil {
        t.Fatalf("unexpected Read error, %sn", err)
    }
    var numMsgs int
    for _ = range changes {
        numMsgs++
    }
    if numMsgs != testDocCount {
        t.Errorf("bad message count, expected %d, got %dn", testDocCount, numMsgs)
    }

The sink test shares a lot of the setup, but here we generate test data in the test itself and use the Write()
function that the adaptor provides. We rely on a feature of Transporter’s messages that you can request a confirmation that they’ve been written. Here are the salient parts:

var wg sync.WaitGroup
    wr, err := CloudantAdaptor.Writer(done, &wg)
    if err != nil {
        t.Errorf("Failed to start Cloudant Writer, %s", err)
    }

    confirms, cleanup := adaptor.MockConfirmWrites()
    defer adaptor.VerifyWriteConfirmed(cleanup, t)

    for i := 0; i < 10; i++ {
        msg := message.From(ops.Insert, "bulk", map[string]interface{}{"foo": i, "i": i})
        if _, err := wr.Write(message.WithConfirms(confirms, msg))(s); err != nil {
            t.Errorf("Write error: %s", err)
        }
    }

If you’re the paranoid type, you could instead use the backchannel approach and verify independently that the messages made it to the backend. If you’re the sensible type, you should also test the ops.Update
and ops.Delete
cases, for which pull requests are most welcome.

In order to run the unit tests for this adaptor, run the following command in the root of the transporter repo:

go test -v ./adaptor/cloudant/...  
=== RUN   TestDescription
--- PASS: TestDescription (0.00s)
=== RUN   TestSampleConfig
--- PASS: TestSampleConfig (0.00s)
=== RUN   TestReader
2018/02/01 10:10:21 Request (attempt: 0) POST http://127.0.0.1:5984/_session  
2018/02/01 10:10:21 Request (attempt: 0) PUT http://127.0.0.1:5984/transporter-bff2...f2d4  
2018/02/01 10:10:21 Request (attempt: 0) POST http://127.0.0.1:5984/_session  
2018/02/01 10:10:21 Request (attempt: 0) PUT http://127.0.0.1:5984/transporter-bff2...f2d4  
2018/02/01 10:10:21 Request (attempt: 0) POST http://127.0.0.1:5984/transporter-bff2...f2d4/_bulk_docs  
2018/02/01 10:10:21 Request (attempt: 0) GET http://127.0.0.1:5984/transporter-bff2...f2d4/_changes?include_docs=true  
2018/02/01 10:10:21 Request (attempt: 0) DELETE http://127.0.0.1:5984/transporter-bff2...f2d4  
2018/02/01 10:10:21 Request (attempt: 0) DELETE http://127.0.0.1:5984/_session  
2018/02/01 10:10:21 Request (attempt: 0) DELETE http://127.0.0.1:5984/_session  
--- PASS: TestReader (0.12s)
=== RUN   TestWriter
2018/02/01 10:10:21 Request (attempt: 0) POST http://127.0.0.1:5984/_session  
2018/02/01 10:10:21 Request (attempt: 0) PUT http://127.0.0.1:5984/transporter-fd87a...6f38  
2018/02/01 10:10:21 Request (attempt: 0) POST http://127.0.0.1:5984/_session  
2018/02/01 10:10:21 Request (attempt: 0) PUT http://127.0.0.1:5984/transporter-fd87a...6f38  
2018/02/01 10:10:21 Request (attempt: 0) POST http://127.0.0.1:5984/transporter-fd87a...6f38  
2018/02/01 10:10:21 Request (attempt: 0) POST http://127.0.0.1:5984/transporter-fd87a...6f38  
2018/02/01 10:10:21 Request (attempt: 0) POST http://127.0.0.1:5984/transporter-fd87a...6f38  
2018/02/01 10:10:21 Request (attempt: 0) POST http://127.0.0.1:5984/transporter-fd87a...6f38  
2018/02/01 10:10:22 Request (attempt: 0) POST http://127.0.0.1:5984/transporter-fd87a...6f38  
2018/02/01 10:10:22 Request (attempt: 0) POST http://127.0.0.1:5984/transporter-fd87a...6f38  
2018/02/01 10:10:22 Request (attempt: 0) POST http://127.0.0.1:5984/transporter-fd87a...6f38  
2018/02/01 10:10:22 Request (attempt: 0) POST http://127.0.0.1:5984/transporter-fd87a...6f38  
2018/02/01 10:10:22 Request (attempt: 0) POST http://127.0.0.1:5984/transporter-fd87a...6f38  
2018/02/01 10:10:22 Request (attempt: 0) POST http://127.0.0.1:5984/transporter-fd87a...6f38  
2018/02/01 10:10:22 Request (attempt: 0) DELETE http://127.0.0.1:5984/transporter-fd87a...6f38  
2018/02/01 10:10:22 Request (attempt: 0) DELETE http://127.0.0.1:5984/_session  
2018/02/01 10:10:22 Request (attempt: 0) DELETE http://127.0.0.1:5984/_session  
--- PASS: TestWriter (0.54s)
PASS  
ok      github.com/compose/transporter/adaptor/cloudant 0.683s

Worked first time, honest!

Running the adaptor

Unit tests are great, but let’s try to use the adaptor in anger, as Compose intended. We did this in theprevious article when we tested the File adaptor. Before we dive into that, we need to do a full build:

go install ./cmd/transporter/...

Let’s see if we now can use a File source and a Cloudant sink. You know the drill:

transporter init file cloudant  
Writing pipeline.js...

Open up pipeline.js
in the editor of your choice, and you should see:

var source = file({  
    "uri": "stdout://"
})

var sink = cloudant({  
    "uri": "${CLOUDANT_URI}",
    "username": "username",
    "password": "password",
    "database": "database",
})

t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")

So we need to fix that up to match our setup. Let’s run against our dockerized local CouchDB again:

var source = file({  
    "uri": "file://testfile.json"
})

var sink = cloudant({  
    "uri": "cloudant://127.0.0.1:5984",
    "username": "admin",
    "password": "xyzzy",
    "database": "testdb",
})

t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")

Let’s try it!

$ transporter run pipeline.js 
2018/02/01 10:14:21 Request (attempt: 0) POST http://127.0.0.1:5984/_session  
2018/02/01 10:14:21 Request (attempt: 0) PUT http://127.0.0.1:5984/testdb  
INFO[0000] starting with metadata map[]                  name=source path=source type=file  
INFO[0000] adaptor Starting...                           name=source path=source type=file  
INFO[0000] boot map[source:file sink:cloudant]           ts=1517480061098523000  
INFO[0000] adaptor Listening...                          name=sink path="source/sink" type=cloudant  
INFO[0000] Read completed                                file=testfile.json  
INFO[0000] adaptor Start finished...                     name=source path=source type=file  
2018/02/01 10:14:21 Request (attempt: 0) POST http://127.0.0.1:5984/testdb  
INFO[0000] adaptor Stopping...                           name=source path=source type=file  
2018/02/01 10:14:21 Request (attempt: 0) POST http://127.0.0.1:5984/testdb  
2018/02/01 10:14:21 Request (attempt: 0) POST http://127.0.0.1:5984/testdb  
2018/02/01 10:14:21 Request (attempt: 0) POST http://127.0.0.1:5984/testdb  
2018/02/01 10:14:21 Request (attempt: 0) POST http://127.0.0.1:5984/testdb  
2018/02/01 10:14:21 Request (attempt: 0) POST http://127.0.0.1:5984/testdb  
2018/02/01 10:14:21 Request (attempt: 0) POST http://127.0.0.1:5984/testdb  
2018/02/01 10:14:21 Request (attempt: 0) POST http://127.0.0.1:5984/testdb  
2018/02/01 10:14:21 Request (attempt: 0) POST http://127.0.0.1:5984/testdb  
2018/02/01 10:14:21 Request (attempt: 0) POST http://127.0.0.1:5984/testdb  
INFO[0001] adaptor Stopped                               name=source path=source type=file  
INFO[0001] adaptor Stopping...                           name=sink path="source/sink" type=cloudant  
INFO[0001] received stop, message buffer is empty, closing...  
INFO[0001] adaptor Listen closed...                      name=sink path="source/sink" type=cloudant  
INFO[0001] adaptor Stopped                               name=sink path="source/sink" type=cloudant  
2018/02/01 10:14:22 Request (attempt: 0) DELETE http://127.0.0.1:5984/_session  
INFO[0001] metrics source records: 10                    path=source ts=1517480062107035000  
INFO[0001] metrics source/sink records: 10               path="source/sink" ts=1517480062107038000  
INFO[0001] exit map[source:file sink:cloudant]           ts=1517480062107040000

Our work here is done. The full source code for the simple Cloudant adaptor can be found here
.

Conclusion

Building on our investigations in the first installment, we’ve managed to implement a simple adaptor for IBM Cloudant, functioning both as a source and as a sink. Mission accomplished.

However, our adaptor has a number of obvious shortcomings, and a few non-obvious ones, too:

  1. It reads the changes feed in one big JSON gulp. This means that it can be a resource hog, even if the go-cloudant
    library tries its best to stream the documents.
  2. It’s not able to continuously tail an evolving source.
  3. The sink writes one document per HTTP request which is a very inefficient way of loading large numbers of documents.
  4. It does not implement the resume buffer functionality that Transporter has available.
  5. In some cases, we can’t safely use a Cloudant sink with a Cloudant source!

The last point isn’t at all obvious and requires some deep Cloudant-fu to understand. The way our sink writes documents means that the sink database is responsible for generating revision ids. This isn’t what we want: we want the sink to simply take the documents as they are from the source (if the source is a Cloudant database) and retain
their revision ids.

As it stands, when the sink is passed documents that already contain revision ids it assumes that these represent document updates
. If such revision ids aren’t already present (they won’t be) those updates will be rejected as conflicts. As a result, a Cloudant-Cloudant Transporter pipeline in its current state won’t function correctly.

The Cloudant replicator
has to contend with the same issue for the same reasons, and in order to resolve this we’d need to do what the replicator does in this situation.

In the final installment, we’ll address some of these issues to make our Cloudant adaptor more useful for real ETL workloads.

Read
more articles about Compose databases – use our Curated Collections Guide
for articles on each database type. If you have any feedback
about this or any other Compose article, drop the Compose Articles team a line atarticles@compose.com. We’re happy to hear from you.

attribution
Michal Jarmoluk

Compose责编内容来自:Compose (源链) | 更多关于

阅读提示:酷辣虫无法对本内容的真实性提供任何保证,请自行验证并承担相关的风险与后果!
本站遵循[CC BY-NC-SA 4.0]。如您有版权、意见投诉等问题,请通过eMail联系我们处理。
酷辣虫 » 后端存储 » Building the soul of a new Transporter adaptor

喜欢 (0)or分享给?

专业 x 专注 x 聚合 x 分享 CC BY-NC-SA 4.0

使用声明 | 英豪名录