I was investigating the feasibility of putting natively-formatted Twitter data into BigQuery, and got pretty far along the way before deciding to go another direction. I found the schema in the twitter-for-bigquery project to be incomplete for my needs, so I made a new schema of my own. I’m making the schema available here in case it’s of use to anyone else.
Here are a couple of things it’s important to understand about the schema:
- Even though the file has the extension
.txt
, it is actually a.json
file. (I had to change the extension to satisfy WordPress.) - I changed the “native” coordinate format (
[lat,lng]
) to a simple object ({"lat","lng"}
) because BigQuery does not support nested arrays of arrays, which the native format of the placebounding_box
field uses. - All of the fields are labeled as
NULLABLE
, although I’m sure some could safely be madeREQUIRED
.
I was able to create tables using this schema in BigQuery, but never got to the point of actually inserting rows, so YMMV.
Also, I created a “helper script” to create a BigQuery schema from JSON data. Essentially, it walks a list of JSON lines files, and creates a BigQuery schema that will either (a) contain all the given records, if they have compatible schemas; or (b) fail. The code for that Python script is here:
import sys, json
filenames = sys.argv[1:]
TYPE = "type"
TYPE_STRING = "STRING"
TYPE_INT = "INTEGER"
TYPE_FLOAT = "FLOAT"
TYPE_BOOL = "BOOLEAN"
TYPE_RECORD = "RECORD"
TYPE_NULL = "NULL"
MODE = "mode"
MODE_NULLABLE = "NULLABLE"
MODE_REQUIRED = "REQUIRED"
MODE_REPEATED = "REPEATED"
MODE_NULL = "NULL"
NAME = "name"
SCHEMA = "schema"
def classify(ns, v):
result = None
if v is None:
result = { MODE: MODE_NULL, TYPE: TYPE_NULL }
elif isinstance(v, str):
result = { MODE: MODE_NULLABLE, TYPE: TYPE_STRING }
elif isinstance(v, dict):
schema = [ ]
for key in v.keys():
field = classify(ns + [ key ], v[key])
field[NAME] = key
schema.append(field)
result = { MODE: MODE_NULLABLE, TYPE: TYPE_RECORD, SCHEMA: schema}
elif isinstance(v, list):
if len(v) == 0:
result = { MODE: MODE_REPEATED, TYPE: TYPE_NULL }
else:
elements = classify(ns + [ "0" ], v[0])
if elements[MODE] == MODE_REPEATED:
sys.stderr.write(f"WARNING: BigQuery cannot model nested REPEATED fields at {ns}. Flattening...\n")
if elements[TYPE] == TYPE_RECORD:
result = { MODE: MODE_REPEATED, TYPE: TYPE_RECORD, SCHEMA: elements[SCHEMA] }
else:
result = { MODE: MODE_REPEATED, TYPE: elements[TYPE] }
elif isinstance(v, bool):
result = { MODE: MODE_NULLABLE, TYPE: TYPE_BOOL }
elif isinstance(v, int):
result = { MODE: MODE_NULLABLE, TYPE: TYPE_INT }
elif isinstance(v, float):
result = { MODE: MODE_NULLABLE, TYPE: TYPE_FLOAT }
else:
raise Exception("Value is not valid JSON: "+str(v))
return result
def merge(ns, t1, t2):
result = None
if t1 == t2:
result = t1
elif t1[MODE] == MODE_NULL:
result = t2
elif t2[MODE] == MODE_NULL:
result = t1
else:
mode = None
if t1[MODE] == t2[MODE]:
mode = t1[MODE]
elif t1[MODE] in (MODE_REQUIRED, MODE_NULLABLE) and t2[MODE] in (MODE_REQUIRED, MODE_NULLABLE):
mode = MODE_REQUIRED
else:
raise Exception("Incompatible modes at "+str(ns)+": "+t1[MODE]+" and "+t2[MODE])
type = None
schema = None
if t1[TYPE] == t2[TYPE]:
type = t1[TYPE]
if t1[TYPE] == TYPE_RECORD:
fs1 = { f[NAME]: f for f in t1[SCHEMA] }
fs2 = { f[NAME]: f for f in t2[SCHEMA] }
schema = []
for key in set(list(fs1.keys()) + list(fs2.keys())):
field = None
if key in fs1 and key not in fs2:
field = fs1[key]
elif key not in fs1 and key in fs2:
field = fs2[key]
else:
f1 = fs1[key]
f2 = fs2[key]
field = merge(ns + [ key ], f1, f2)
field[NAME] = key
schema.append(field)
elif t1[TYPE] == TYPE_NULL:
type = t2[TYPE]
if type == TYPE_RECORD:
schema = t2[SCHEMA]
elif t2[TYPE] == TYPE_NULL:
type = t1[TYPE]
if type == TYPE_RECORD:
schema = t1[SCHEMA]
else:
raise Exception("Incompatible types at "+str(ns)+": "+t1[TYPE]+" and "+t2[TYPE])
if schema is None:
result = { MODE: mode, TYPE: type }
else:
result = { MODE: mode, TYPE: type, SCHEMA: schema }
return result
schema = []
def walk(o, s):
global schema
fs = { f[NAME]: f for f in schema }
for key in o.keys():
field = None
if key in fs:
f1 = fs[key]
f2 = classify([ key ], o[key])
field = merge([ key ], f1, f2)
else:
field = classify([ key ], o[key])
field[NAME] = key
fs[key] = field
schema = list(fs.values())
for filename in filenames:
with open(filename, "r") as f:
for line in f:
walk(json.loads(line), schema)
print(json.dumps(schema))
This script did the heavy lifting of generating the schema, since doing it by entirely hand would have been error-prone. Here are a couple of important things to understand about the script:
- BigQuery supports many more types of data than JSON, so the generated schema’s data types may not match the semantics of the JSON field. For example, the script does not attempt to parse string values to determine if a string is semantically a date or timestamp instead.
- The script flattens arrays of arrays and prints a warning, since BigQuery does not support storing nested arrays of arrays.
To generate the tweet schema, I pulled several hundred tweets from user timelines — including some hand-crafted tweets from a test account to capture specific data features — and ran them through the script to generate the “skeleton” of the schema. I then went back through and hand-edited the schema to make some changes (e.g, bounding_box
above) and refinements (e.g., created_at
from STRING
to TIMESTAMP
).
In case anyone needs a license, I formally make the above schema and script available under the license CC-BY-SA. Also, while I wouldn’t be sharing the content if I didn’t think it would be useful, you use all of the above at your own risk.
I also wouldn’t mind a shout out here in the comments, too, if anyone does use this!