• Home

  • Custom Ecommerce
  • Application Development
  • Database Consulting
  • Cloud Hosting
  • Systems Integration
  • Legacy Business Systems
  • Security & Compliance
  • GIS

  • Expertise

  • About Us
  • Our Team
  • Clients
  • Blog
  • Careers

  • CasePointer

  • VisionPort

  • Contact
  • Our Blog

    Ongoing observations by End Point Dev people

    Loading JSON Files Into PostgreSQL 9.5

    Szymon Lipiński

    By Szymon Lipiński
    March 3, 2016

    In the previous posts I have described a simple database table for storing JSON values, and a way to unpack nested JSON attributes into simple database views. This time I will show how to write a very simple query (thanks to PostgreSQL 9.5) to load the JSON files

    Here’s a simple Python script to load the database.

    This script is made for PostgreSQL 9.4 (in fact it should work for 9.5 too, but is not using a nice new 9.5 feature described below).

    #!/usr/bin/env python
    
    import os
    import sys
    import logging
    
    try:
        import psycopg2 as pg
        import psycopg2.extras
    except:
        print "Install psycopg2"
        exit(123)
    
    try:
        import progressbar
    except:
        print "Install progressbar2"
        exit(123)
    
    import json
    
    import logging
    logger = logging.getLogger()
    
    PG_CONN_STRING = "dbname='blogpost' port='5433'"
    
    data_dir = "data"
    dbconn = pg.connect(PG_CONN_STRING)
    
    logger.info("Loading data from '{}'".format(data_dir))
    
    cursor = dbconn.cursor()
    
    counter = 0
    empty_files = []
    
    class ProgressInfo:
    
        def __init__(self, dir):
            files_no = 0
            for root, dirs, files in os.walk(dir):
                for file in files:
                    if file.endswith(".json"):
                        files_no += 1
            self.files_no = files_no
            print "Found {} files to process".format(self.files_no)
            self.bar = progressbar.ProgressBar(maxval=self.files_no,
                                               widgets=[' [', progressbar.Timer(), '] [', progressbar.ETA(), '] ', progressbar.Bar(),])
    
        def update(self, counter):
            self.bar.update(counter)
    
    pi = ProgressInfo(os.path.expanduser(data_dir))
    
    for root, dirs, files in os.walk(os.path.expanduser(data_dir)):
        for f in files:
            fname = os.path.join(root, f)
    
            if not fname.endswith(".json"):
                continue
            with open(fname) as js:
                data = js.read()
                if not data:
                    empty_files.append(fname)
                    continue
                import json
                dd = json.loads(data)
                counter += 1
                pi.update(counter)
                cursor.execute("""
                                INSERT INTO stats_data(data)
                                SELECT %s
                                WHERE NOT EXISTS (SELECT 42
                                                  FROM stats_data
                                                  WHERE
                                                        ((data->>'metadata')::json->>'country')  = %s
                                                    AND ((data->>'metadata')::json->>'installation') = %s
                                                    AND tstzrange(
                                                            to_timestamp((data->>'start_ts')::double precision),
                                                            to_timestamp((data->>'end_ts'  )::double precision)
                                                        ) &&
                                                        tstzrange(
                                                            to_timestamp(%s::text::double precision),
                                                            to_timestamp(%s::text::double precision)
                                                        )
                                                 )
                            """, (data, str(dd['metadata']['country']), str(dd['metadata']['installation']), str(dd['start_ts']), str(dd['end_ts'])))
    
    print ""
    
    logger.debug("Refreshing materialized views")
    cursor.execute("""REFRESH MATERIALIZED VIEW sessions""");
    cursor.execute("""ANALYZE""");
    
    dbconn.commit()
    
    logger.info("Loaded {} files".format(counter))
    logger.info("Found {} empty files".format(len(empty_files)))
    if empty_files:
        logger.info("Empty files:")
        for f in empty_files:
            logger.info(" >>> {}".format(f))
    

    I have created two example files in the “data” directory, the output of this script is:

    Found 2 files to process
     [Elapsed Time: 0:00:00] [ETA:  0:00:00] |#####################################|
    

    Yey, so it works. What’s more, I can run the script again on the same files, and it will try loading the same data without any errors. Do you rememember that there was an EXCLUDE constraint which doesn’t allow us to load any JSON for the same country, and installation, and overlapping time range? That’s why the query is so long. I also need to check that such a JSON is not in the database, so I can load it.

    This is twice slower than the next solution. The problem is that it needs to unpack the JSON to run the subquery, then insert the data checking the same thing (in fact the insert, and the subquery are using the same index made by the EXCLUDE constraint).

    And then PostgreSQL 9.5 was released, with one great feature: ON CONFLICT DO SOMETHING. The conflict is a UNIQUE index violation. The EXCLUDE clause in the stats_data table created such a unique index.

    There can also be ON CONFLICT DO NOTHING, and that’s what I have used. I changed only one query in the script, and instead of this:

    
                cursor.execute("""
                                INSERT INTO stats_data(data)
                                SELECT %s
                                WHERE NOT EXISTS (SELECT 42
                                                  FROM stats_data
                                                  WHERE
                                                        ((data->>'metadata')::json->>'country')  = %s
                                                    AND ((data->>'metadata')::json->>'installation') = %s
                                                    AND tstzrange(
                                                            to_timestamp((data->>'start_ts')::double precision),
                                                            to_timestamp((data->>'end_ts'  )::double precision)
                                                        ) &&
                                                        tstzrange(
                                                            to_timestamp(%s::text::double precision),
                                                            to_timestamp(%s::text::double precision)
                                                        )
                                                 )
                            """, (data, str(dd['metadata']['country']), str(dd['metadata']['installation']), str(dd['start_ts']), str(dd['end_ts'])))
    
    

    It looks like this:

    
                cursor.execute("""
                                INSERT INTO stats_data(data)
                                VALUES (%s)
                                ON CONFLICT ON CONSTRAINT no_overlapping_jsons DO NOTHING
                            """, (data, ))
    
    

    This version requires PostgreSQL 9.5 and will not work on the previous versions.

    It is twice as fast as the original, and works as expected. This means that I can run it on the already loaded files, and will not load them. This way when I use rsync to download the new files, I can just run the script, and it will load only the new files into the database.

    Loading 88k of JSON files using the production version of the script with the first query takes over 1 minute.

    Loading the files using the second version takes less than 30 seconds.

    postgres


    Comments