Working with Aurora-MySQL

Every time Bipost Sync sends data to AWS it works along with an API that runs 4 steps.

1. Create-Alter Schemas

  • If it doesn't exist, database is created with:

    • Name: The one you specified on Factor BI Console.

    • Encoding: cp1252 West European (latin1)

    • Collation: latin1_spanish_ci

  • Tables are created with all the columns found on source db.

  • Alter tables to match source schemas. Columns are never deleted.

  • Only fields specified in customData.json will be populated.


2. Prepare Database Before Loading

After schemas are created/altered and before new data is loaded, you can run a stored procedure.

This is very useful if you need to delete, truncate or make any changes before data is loaded.

Include all desired statements in a stored procedure named spPostInitial.

_serviceID and _syncID variables are automatically delivered from the API.

Example:

DELIMITER $$
DROP PROCEDURE IF EXISTS `spPostInitial`$$
CREATE PROCEDURE `spPostInitial`(
  _serviceID varchar(36),
  _syncID int
  )
postinitial:BEGIN

  DECLARE _SQL  longtext;
  DECLARE _timestamp datetime;
  DECLARE _syncDate date;
  DECLARE _timezone varchar(64);
  DECLARE _rid, _prevId, _count_bipost_sync_info, _count_table int;
  DECLARE _numDays decimal(10,0);
  DECLARE _tableName, _comment1, _comment2 varchar(255);

  SET lc_time_names = 'en_US';
  SET group_concat_max_len = 4294967295;
  SET _count_bipost_sync_info = 0;

  SELECT COUNT(*) INTO _count_bipost_sync_info FROM `bipost_system`.bipost_sync_info WHERE id = _syncID AND serviceID = _serviceID;

  IF IFNULL(_count_bipost_sync_info,0) = 0 THEN
    INSERT INTO logPostInitial (message, serviceID, syncID) VALUES ('execution parameters do not match', _serviceID, _syncID);
    SELECT 'execution parameters do not match' AS message;
    LEAVE postinitial;
  END IF;

  SELECT NULLIF(TRIM(timezone),'') INTO _timezone FROM syncInfoStores WHERE serviceID = _serviceID;
  IF _timezone IS NULL THEN
    SELECT IFNULL(NULLIF(TRIM(timezone),''),'US/Eastern') INTO _timezone FROM syncInfo WHERE serviceID = _serviceID;
  END IF;
  SELECT CONVERT_TZ(syncDate, 'UTC', _timezone) INTO _timestamp FROM `bipost_system`.bipost_sync_info WHERE id = _syncID;
  SELECT IFNULL(CAST(_timestamp AS date),'0000-00-00 00:00:00.0000') INTO _syncDate;

  IF _syncDate <> '0000-00-00 00:00:00.0000' THEN

    INSERT INTO logPostInitial (message, serviceID, syncID) VALUES ('ok', _serviceID, _syncID);

    SET _prevId = 0, _count_table = 0;

    table_id: WHILE(1=1) DO
      SELECT MIN(rid)
        INTO _rid
        FROM `bipost_system`.bipost_sync_table
        WHERE rid > _prevId AND id = _syncID AND comment1 = '-1';

      IF _rid IS NULL THEN
        LEAVE table_id;
      END IF;

      SELECT tableName, comment1
        INTO _tableName, _comment1
        FROM `bipost_system`.bipost_sync_table
        WHERE rid = _rid;

      SELECT COUNT(*) INTO _count_table FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_name = _tableName AND table_schema = schema();
      IF _count_table > 0 THEN
        SET _SQL = CONCAT(
          'TRUNCATE TABLE `', _tableName, '`;');
        SET @SQL = _SQL; PREPARE stmt3 FROM @SQL; EXECUTE stmt3; DEALLOCATE PREPARE stmt3; SET _SQL = NULL;
      END IF;

      SET _prevId = _rid, _count_table = 0;
    END WHILE;

  #***************** Delete child tables first *****************

    SET _numDays = 0, _count_table = 0, _tableName = 'claves_articulos';
    SELECT CAST(comment1 AS decimal(10,0))*(-1) INTO _numDays FROM `bipost_system`.bipost_sync_table WHERE id = _syncID AND tableName = _tableName;
    SELECT COUNT(*) INTO _count_table FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_name = _tableName AND table_schema = schema();
    IF _numDays < 0 AND _count_table > 0 THEN
      DELETE claves_articulos
        FROM claves_articulos
        JOIN articulos ON claves_articulos.ARTICULO_ID = articulos.ARTICULO_ID
        WHERE ((CAST(articulos.FECHA_HORA_CREACION AS date) BETWEEN date_add(_syncDate, INTERVAL _numDays day) AND _syncDate) OR (CAST(articulos.FECHA_HORA_ULT_MODIF AS date) BETWEEN date_add(_syncDate, INTERVAL _numDays day) AND _syncDate));
    END IF;

    SET _numDays = 0, _count_table = 0, _tableName = 'dirs_clientes';
    SELECT CAST(comment1 AS decimal(10,0))*(-1) INTO _numDays FROM `bipost_system`.bipost_sync_table WHERE id = _syncID AND tableName = _tableName;
    SELECT COUNT(*) INTO _count_table FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_name = _tableName AND table_schema = schema();
    IF _numDays < 0 AND _count_table > 0 THEN
      DELETE dirs_clientes
        FROM dirs_clientes
        JOIN clientes ON dirs_clientes.CLIENTE_ID = clientes.CLIENTE_ID
        WHERE ((CAST(clientes.FECHA_HORA_CREACION AS date) BETWEEN date_add(_syncDate, INTERVAL _numDays day) AND _syncDate) OR (CAST(clientes.FECHA_HORA_ULT_MODIF AS date) BETWEEN date_add(_syncDate, INTERVAL _numDays day) AND _syncDate));
    END IF;

  #***************** Delete header/parent tables *****************

    SET _numDays = 0, _count_table = 0, _tableName = 'articulos';
    SELECT CAST(comment1 AS decimal(10,0))*(-1) INTO _numDays FROM `bipost_system`.bipost_sync_table WHERE id = _syncID AND tableName = _tableName;
    SELECT COUNT(*) INTO _count_table FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_name = _tableName AND table_schema = schema();
    IF _numDays < 0 AND _count_table > 0 THEN
      DELETE FROM articulos
        WHERE ((CAST(FECHA_HORA_CREACION AS date) BETWEEN date_add(_syncDate, INTERVAL _numDays day) AND _syncDate) OR (CAST(FECHA_HORA_ULT_MODIF AS date) BETWEEN date_add(_syncDate, INTERVAL _numDays day) AND _syncDate));
    END IF;

    SET _numDays = 0, _count_table = 0, _tableName = 'clientes';
    SELECT CAST(comment1 AS decimal(10,0))*(-1) INTO _numDays FROM `bipost_system`.bipost_sync_table WHERE id = _syncID AND tableName = _tableName;
    SELECT COUNT(*) INTO _count_table FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_name = _tableName AND table_schema = schema();
    IF _numDays < 0 AND _count_table > 0 THEN
      DELETE FROM clientes
        WHERE ((CAST(FECHA_HORA_CREACION AS date) BETWEEN date_add(_syncDate, INTERVAL _numDays day) AND _syncDate) OR (CAST(FECHA_HORA_ULT_MODIF AS date) BETWEEN date_add(_syncDate, INTERVAL _numDays day) AND _syncDate));
    END IF;

  #***************** Tables with special comments *****************
    SET _numDays = 0, _count_table = 0, _tableName = 'saldos_co';
    SELECT comment2 INTO _comment2 FROM `bipost_system`.bipost_sync_table WHERE id = _syncID AND tableName = _tableName;
    SELECT COUNT(*) INTO _count_table FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_name = _tableName AND table_schema = schema();
    IF _comment2 = 'last year + ytd' AND _count_table > 0 THEN
      DELETE FROM saldos_co
        WHERE ANO >= YEAR(_syncDate)-1;
    END IF;

    SET _numDays = 0, _count_table = 0, _tableName = 'saldos_in';
    SELECT comment2 INTO _comment2 FROM `bipost_system`.bipost_sync_table WHERE id = _syncID AND tableName = _tableName;
    SELECT COUNT(*) INTO _count_table FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_name = _tableName AND table_schema = schema();
    IF _comment2 = 'last month + mtd' AND _count_table > 0 THEN
      DELETE FROM saldos_in
        WHERE ANO = YEAR(_syncDate) AND MES IN (MONTH(_syncDate)-1, MONTH(_syncDate));
    END IF;

  END IF;

END$$
DELIMITER ;

3. Load Data

Data loading is performed by Aurora. Matching primary keys rows are updated and the rest are inserted.

You can verify which tables where loaded by querying aurora_s3_load_history table like this:

SELECT
  LOWER(REPLACE(RIGHT(e.file_name,LOCATE('/',REVERSE(e.file_name))-1),'.csv','')) AS "table",
  CONVERT_TZ(load_timestamp,'UTC','US/Eastern') AS "local_load_timestamp",
  e.load_timestamp,
  SUBSTRING(e.load_prefix, LOCATE('bipostdata',e.load_prefix), LOCATE('/',e.load_prefix, LOCATE('bipostdata',e.load_prefix)+1)-LOCATE('bipostdata',e.load_prefix)) AS "bucket",
  LEFT(e.file_name,LOCATE('/', e.file_name)-1) AS "serviceNumber"
FROM mysql.aurora_s3_load_history AS e
WHERE
--  e.file_name REGEXP 'your-service-numer-bc9a-c0123def4567'
  file_name REGEXP 'mytable.csv'
--  AND e.load_timestamp>'2019-01-01 09:41:41'
ORDER BY e.load_timestamp DESC LIMIT 100 ;

4. Transform Data After Loading

After new data is loaded to Aurora-MySQL and before it begins the downloading process to your on-prem, you can run a stored procedure.

This is very useful to analyze and populate new tables.

It is also useful to prepare tables with data sets for the downloading process back to on-premises.

Include all desired statements in a stored procedure named spPostFinal.

_serviceID and _syncID variables are automatically delivered from the API.

Example:

DELIMITER $$
DROP PROCEDURE IF EXISTS `spPostFinal`$$
CREATE PROCEDURE `spPostFinal`(
  _serviceID varchar(36),
  _syncID int
  )
postfinal:BEGIN

  DECLARE _timestamp datetime;
  DECLARE _syncDate date;
  DECLARE _timezone varchar(64);
  DECLARE _count_bipost_sync_info int;

  SET lc_time_names = 'en_US';
  SET group_concat_max_len = 4294967295;

  SELECT COUNT(*) INTO _count_bipost_sync_info FROM `bipost_system`.bipost_sync_info WHERE id = _syncID AND serviceID = _serviceID;

  IF _count_bipost_sync_info = 0 THEN
    INSERT INTO logPostFinal (message, serviceID, syncID) VALUES ('execution parameters do not match', _serviceID, _syncID);
    SELECT 'execution parameters do not match' AS message;
    LEAVE postfinal;
  END IF;

  SELECT NULLIF(TRIM(timezone),'') INTO _timezone FROM syncInfoStores WHERE serviceID = _serviceID;
  IF _timezone IS NULL THEN
    SELECT IFNULL(NULLIF(TRIM(timezone),''),'US/Eastern') INTO _timezone FROM syncInfo WHERE serviceID = _serviceID;
  END IF;
  SELECT CONVERT_TZ(syncDate, 'UTC', _timezone) INTO _timestamp FROM `bipost_system`.bipost_sync_info WHERE id = _syncID;
  SELECT IFNULL(CAST(_timestamp AS date),'0000-00-00 00:00:00.0000') INTO _syncDate;

  UPDATE syncInfo
    SET syncDate = _syncDate, `timestamp` = _timestamp, lastSyncId = _syncID
    WHERE serviceID = _serviceID;

  call spReport1(_serviceID);
  call spReport2(_serviceID);

  INSERT INTO logPostFinal (message, serviceID, syncID) VALUES ('ok', _serviceID, _syncID);
  SELECT 'ok' AS message;

END$$
DELIMITER ;

Execution information

A database named bipost_system is created on Aurora-MySQL and has information about every sync process. bipost_sync_table stores comment1 and comment2 for ever table and sync execution.

Make the following query to know what kind of information is stored:

SELECT * FROM `bipost_system`.bipost_sync_info ORDER BY id DESC LIMIT 100;
SELECT * FROM `bipost_system`.bipost_sync_table ORDER BY id DESC, rid LIMIT 100;

More examples

Check more transformation examples on this repository.