Streaming Prebid to Google BigQuery

Subscribe to our monthly newsletter to get the latest updates in your inbox

This post details a solution to getting Prebid.js data in a Real Time dashboard, so that publishers can make quick, solid decisions on the advertisers bidding for their ad spots. The metrics acquired will allow you to understand the following metrics:
  • avg. cpm per bidder
  • avg. cpm per ad slot
  • avg. cpm per page/hostname
  • Number of bids vs. Number of wins per Advertiser
  • Avg. Response time per bidder/page etc.
Based on this you will be able to make data driven decisions regarding what ads to use on your website to maximize each ads performance and earnings. We will also show you how to merge the data with Google Analytics to gain additional user insights such as bid information by geographical location, device type, browser etc. Many publisher use Prebid.js as a framework because it allows website owners to fetch bids from multiple partners, pass bid information to their ad servers, and display ads on the page picked by the server. If native Google Analytics support for Prebid.js is turned on dozens of events are sent on every pageview. This can be a very inefficient solution for larger publishers.

Our Solution

For a recent client we had a challenge where sending the Prebid.js data directly to Google Analytics would have sent million of additional events per day. The alternative we selected was storing all of the data in a way that would be more schematic. The desired outcome was a dashboard with a few key metrics with the flexibility of the data being manually queried to answer more specific business questions. The solution presented is not limited to just Prebid.js, as there are many cases where you would want to stream data directly to BigQuery. Having massive amounts of data available for real-time querying is invaluable.

Streaming Data into BigQuery

BigQuery allows API users direct  data streaming to the tables hosted on the platform. Data or more specifically rows streamed to BigQuery become instantly available for querying, analysis, visualizations etc. Data can be streamed to BigQuery using the restful API or any of the supported libraries built on top of it. For this use case we decided to use Python. I'd like to point out here that data can be streamed from the website directly to BigQuery from the website's backend (server) or using frontend (JavaScript). The two obstacles that we faced with those approaches:
  1. Sending hits from website's backend require you have very good communication and quick release cycles with your web dev team. Small changes such as adding additional fields or filtering certain requests may take a long time for updates and deployments, especially if your website is developed in monthly versions.
  2. Data can also be sent from JavaScript (either on the page or using Google Tag Manager), but that means that you will be expose authentication data to the client.
Our solution was adding an AppEngine application between the website and Google BigQuery. This allowed us to hide the authentication data and logic from the web client, but at the same time made it possible for us to control the updates to the streaming process without the need to wait for the website updates.

Diagram of the Solution:

We have also avoided the need to update the website by sending requests from a custom HTML tag in the Google Tag Manager. To avoid any cost surprises from the start when processing data in AppEngine and sending it to BigQuery we have also implemented sampling logic. We also send all the rows that we want to stream to BigQuery in bigger chunks to AppEngine.

Google Tag Manager Configuration

In Google Tag Manager we created a custom HTML tag that gets fired when Prebid.js data become available. We are relying on the data from the global pbjs object:
pbjs.getAllWinningBids()
The data is then sent as a post request to our AppEngine application.
jQuery.post("https://YOUR-PROJECT-ID.appspot.com/bq-streamer", {"bq":JSON.stringify(bqArray)})
To make the data even more useful we decided to enrich every row with additional Google Analytics data such as client id, page path, and hostname. These will later allow us to join the BigQuery's Prebid table with Google Analytics data. The full custom HTML snippet is available at the bottom of this post.

Adding Client Id and Merging the Data with Google Analytics

One of the additional fields that we're sending to BigQuery for each row is the value of client id. Client Id can be accessed in GTM as a variable that reads a 1st party cookie. Generally the value should be available in a cookie named _ga. There's a really good post by Simo Ahava on how to access the Client Id that goes into more detail around it. Client Id should also be passed to your Google Analytics as a user (or session)  based custom dimension. Note that Unsampled and BigQuery export of GA data is only available to GA 360 users. One of the ways that standard Google Analytics users can access Client Id data is the reporting api by querying the Client Id custom dimension as one of the fields (be aware of sampling).

Google AppEngine Configuration

Our AppEngine application listens for the request and makes sure that they are coming from the right source and in a correct format. Valid requests are then sent to AppEngine tasks where the rows received in batches, are looped through to append server time timestamp as well as a unique identifier that is required for BQ streaming, when the data is in a format we want it, and finally the application streams all the rows to BigQuery where they become readily available for querying. Part of the code that loops through the rows and streams the data to BigQuery.
rows = []
for row in b: #data received from the HTTP request - Adding timestamp
	row['timestamp'] = int(time.time())
	rows.append({
	      "insertId": str(uuid.uuid4()),
 	      "json": row
        })

table_data_insert_all_request_body = {
	  "kind": "bigquery#tableDataInsertAllRequest",
	  "skipInvalidRows": True,
	  "ignoreUnknownValues": True,
	  #"templateSuffix": string,
	  "rows": rows
}

bqrequest = bq.tabledata().insertAll(projectId=projectId, datasetId=datasetId, tableId=tableId, body=table_data_insert_all_request_body)
bqresponse = bqrequest.execute(http=http_auth)

 

Google BigQuery Configuration

The table schema in BigQuery is practically a copy of the object available as Prebid's response. We collect all the bidders, identify the winners, store cpm of every bid and append some traditional Google Analytics data such as page, hostname and client id. table_schema

Data Studio Reporting

Because Data Studio can connect to BigQuery natively it made sense to start plotting all of our tables in reports. Average CPM by bidder. Each bidder offers a different CPM which are the ones that generally perform better than the others. bid_position Whose bidding and whose winning? bids_won_bids What's the average CPM based on the ad position on the website? avg_cpm_bid

Usage Analysis and Cost

  1. Google Tag Manager - Is completely free the amount of requests created to send data does not incur any cost
  2. Google AppEngine - Allows free 28h of instance-time per day, from our experience one instance can handle around 5 requests per second with 20+ rows of data which would be almost a million rows of data transferred to BQ for free (Prebid.js data). The second cost that can incur here is $0.12 per GB for outgoing bandwidth. Additional instances will cost you $0.05 per hour.
  3. Google BigQuery - Storing data in BigQuery costs either $0.01 or $0.02 per GB per month (depending on the age of the data). Streaming costs are $0.05 per GB. So even when streaming millions of rows of data per day your costs will likely be less than a cup of coffee per day.
 

Full Journey of a hit

  1. A user lands on your website
  2. Publishers start bidding for the ads to display to that user
  3. Once the bids are available to the browser (provided by Prebid.js) they're sent to your AppEngine app
  4. Your application cleans the data and streams it to BigQuery
  5. Bidding data is now available in your BigQuery tables ready to be queries
  6. Data Studio queries the data and provides real time insights
 

Conclusion

It is really easy to stream data from your website directly to Google BigQuery. This can all be done independent of the website itself and mostly hidden from the client which allows easy updates and better control over the data that's coming to the tables. In this example we've been talking about Prebid.js, but there are a lot of other sources of data that can be sent to BigQuery directly and later even be merged with your Google Analytics data. Some other possible uses:
  • Debugging. You can send e-commerce data directly to BigQuery to see what transactions may be getting dropped in GA
  • Data from your CRM that is not allowed in Google Analytics (client id, together with email data can be stored in BigQuery)
  • Data that may not fit the GA mold. For example events that require too many dimensions, or do not follow the traditional Category, Action, Label, Value etc.
  • Data that would put the amount of your hits in Google Analytics in a new tier
 

Additional Code Snippets

Prebid.js Specific Code *Sampling is implemented by looking at the last digit of user's client id. This ensures that we're not sampling "hits", but rather full sessions and users.
<script>
    var clientId = ;
    
	var bqArray = []
  
  	var winningBids = pbjs.getAllWinningBids()
  	var winnerIds = {}
    
    
  	for(var i=0; i < winningBids.length; i++){
      winnerIds[winningBids[i]['adUnitCode']] = winningBids[i]['adId']
    }
  
  	var allResponses = pbjs.getBidResponses()
    
    for(var adUnitCode in allResponses){
       var winner = ""
       if(winnerIds.hasOwnProperty(adUnitCode)){
       		winner = winnerIds[adUnitCode]
       }
      
      for(var i=0; i<allResponses[adUnitCode]['bids'].length; i++){
        var bid = allResponses[adUnitCode]['bids'][i];
        var adWinner = false;
        
        if(bid.adId === winner){
        	adWinner = true
        }
      	        
        var bqRow = {
          "bidWinner": adWinner,
          "adId":bid['adId'],
          "adUnitCode":bid['adUnitCode'],
          "bidder":bid['bidder'],
          "bidderCode":bid['bidderCode'],
          "cpm":parseFloat(bid['cpm']),
          "height":parseInt(bid['height']),
          "pbAg":parseFloat(bid['pbAg']),
          "pbDg":parseFloat(bid['pbDg']),
          "pbHg":parseFloat(bid['pbHg']),
          "pbLg":parseFloat(bid['pbLg']),
          "pbMg":parseFloat(bid['pbMg']),
          "requestId":bid['requestId'],
          "requestTimestamp":parseInt(bid['requestTimestamp']),
          "responseTimestamp":parseInt(bid['responseTimestamp']),
          "statusMessage":bid['statusMessage'],
          "timeToRespond":parseInt(bid['timeToRespond']),
          "width":parseInt(bid['width']),
          "clientId":clientId,
          "page":"",
          "hostname":""
        }
        
        bqArray.push(bqRow)
      }
    }
  	
  //30% sampling, based on the clientId
  var lastClientIdNumber = Number(clientId.substr(clientId.length-1));
  if(lastClientIdNumber<3){
  	jQuery.post("https://YOUR-PROJECT-ID.appspot.com/bq-streamer", {"bq":JSON.stringify(bqArray)})
  }

</script>
AppEngine additional code main.py
import webapp2, json, logging
from oauth2client.appengine import AppAssertionCredentials
from httplib2 import Http
from apiclient import discovery
import time, uuid
from google.appengine.api import memcache, taskqueue
from datetime import date, timedelta

class MainHandler(webapp2.RequestHandler):
	def post(self):
		self.response.headers.add_header("Access-Control-Allow-Origin", "*")
		b = self.request.get("bq")		
		task = taskqueue.add(url='/bq-task', params={'bq': b})

class CreateTable(webapp2.RequestHandler):
	def get(self):
		bq = memcache.get("bq")

		#bq = None
		if bq is None:
			bq = discovery.build('bigquery', 'v2', http = Http())
			memcache.set("bq",bq)

		http_auth = memcache.get("http_auth")
		logging.debug(http_auth)
		http_auth = None
		if http_auth is None:
			credentials = AppAssertionCredentials('https://www.googleapis.com/auth/bigquery')
			http_auth = credentials.authorize(Http())
			memcache.set("http_auth", http_auth)

		projectId = ''
		datasetId = ''
		tableId = ''

		tomorrow = (date.today() + timedelta(days=1)).strftime("%Y%m%d")

		datedTable = "%s_%s"%(tableId, tomorrow)

		table_body = {
				 "kind": "bigquery#table",
				 "tableReference": {
				    "projectId": projectId,
				    "datasetId": datasetId,
				    "tableId": datedTable
				  },
				
				 "schema": {
				  "fields": [
				   {
				    "name": "timestamp",
				    "type": "INTEGER",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "bidWinner",
				    "type": "BOOLEAN",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "adId",
				    "type": "STRING",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "adUnitCode",
				    "type": "STRING",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "bidder",
				    "type": "STRING",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "bidderCode",
				    "type": "STRING",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "cpm",
				    "type": "FLOAT",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "height",
				    "type": "INTEGER",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "pbAg",
				    "type": "FLOAT",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "pbDg",
				    "type": "FLOAT",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "pbHg",
				    "type": "FLOAT",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "pbLg",
				    "type": "FLOAT",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "pbMg",
				    "type": "FLOAT",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "requestId",
				    "type": "STRING",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "requestTimestamp",
				    "type": "INTEGER",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "responseTimestamp",
				    "type": "INTEGER",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "statusMessage",
				    "type": "STRING",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "timeToRespond",
				    "type": "INTEGER",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "width",
				    "type": "INTEGER",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "clientId",
				    "type": "STRING",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "page",
				    "type": "STRING",
				    "mode": "NULLABLE"
				   },
				   {
				    "name": "hostname",
				    "type": "STRING",
				    "mode": "NULLABLE"
				   }
				  ]
				 }
				}

		request = bq.tables().insert(projectId=projectId, datasetId=datasetId, body=table_body)
		response = request.execute(http=http_auth)

		


class BqHandler(webapp2.RequestHandler):
	def post(self):
		b = self.request.get("bq")
		b = json.loads(b)


		if len(b) > 0:
			projectId = ''
			datasetId = ''
			tableId = ''

			today = date.today().strftime("%Y%m%d")

			tableId = "%s_%s"%(tableId, today)
			
			bq = memcache.get("bq")

			#bq = None
			if bq is None:
				bq = discovery.build('bigquery', 'v2', http = Http())
				memcache.set("bq",bq)

			http_auth = memcache.get("http_auth")
			logging.debug(http_auth)
			http_auth = None
			if http_auth is None:
				credentials = AppAssertionCredentials('https://www.googleapis.com/auth/bigquery.insertdata')
				http_auth = credentials.authorize(Http())
				memcache.set("http_auth", http_auth)


			rows = []
			for row in b:
				row['timestamp'] = int(time.time())
				rows.append({
				      "insertId": str(uuid.uuid4()),
				      "json": row
				    })

			table_data_insert_all_request_body = {
				  "kind": "bigquery#tableDataInsertAllRequest",
				  "skipInvalidRows": True,
				  "ignoreUnknownValues": True,
				  #"templateSuffix": string,
				  "rows": rows
				}


			bqrequest = bq.tabledata().insertAll(projectId=projectId, datasetId=datasetId, tableId=tableId, body=table_data_insert_all_request_body)
			bqresponse = bqrequest.execute(http=http_auth)



app = webapp2.WSGIApplication([
    ('/bq-streamer', MainHandler),
    ('/bq-task', BqHandler),
    ('/create-table', CreateTable)
], debug=True)
app.yaml
application: YOUR-APP-ID
version: 1
runtime: python27
api_version: 1
threadsafe: yes

handlers:
- url: .*
  script: main.app


libraries:
- name: webapp2
  version: "2.5.2"