base table ddl:
CREATE TABLE `protocol_info` (
`chain` varchar(100) NOT NULL COMMENT "",
`protocol_slug` varchar(200) NOT NULL COMMENT "",
`protocol_type` varchar(200) NULL COMMENT "",
`protocol_sub_type` varchar(200) NULL COMMENT "",
`protocol_name` varchar(200) NULL COMMENT "",
`logo` varchar(65533) NULL COMMENT "",
`discord` varchar(65533) NULL COMMENT "",
`github` varchar(65533) NULL COMMENT "",
`twitter` varchar(65533) NULL COMMENT "",
`telegram` varchar(65533) NULL COMMENT "",
`web_url` varchar(65533) NULL COMMENT "",
`description` varchar(65533) NULL COMMENT "",
`updated_at` datetime NULL COMMENT "",
`created_at` datetime NULL COMMENT "",
`created_by` varchar(255) NULL COMMENT "",
`updated_by` varchar(255) NULL COMMENT "",
`categories` varchar(255) NULL COMMENT ""
) ENGINE=OLAP
UNIQUE KEY(`chain`, `protocol_slug`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`protocol_slug`) BUCKETS 1
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"compression" = "LZ4"
);
############################################################
CREATE TABLE `ethereum_asset_flow` (
`address` varchar(100) NOT NULL COMMENT "",
`hash` varchar(100) NOT NULL COMMENT "",
`index` int(11) NOT NULL COMMENT "",
`internal_index` varchar(50) NOT NULL COMMENT "",
`block_timestamp` datetime NOT NULL COMMENT "",
`asset_address` varchar(100) NOT NULL COMMENT "",
`flow_type` varchar(10) NOT NULL COMMENT "",
`block_number` int(11) NOT NULL COMMENT "",
`interact_address` varchar(100) NULL COMMENT "",
`nft_token_id` varchar(100) NULL COMMENT "",
`amount_raw` varchar(100) NULL COMMENT "",
INDEX asset_address (`asset_address`) USING BITMAP COMMENT ''
) ENGINE=OLAP
UNIQUE KEY(`address`, `hash`, `index`, `internal_index`, `block_timestamp`, `asset_address`, `flow_type`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`address`) BUCKETS 3000
PROPERTIES (
"replication_num" = "3",
"colocate_with" = "money_flow_group",
"in_memory" = "false",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"compression" = "ZSTD"
);
############################################################
CREATE TABLE `ethereum_asset_flow_near_real_time` (
`address` varchar(100) NOT NULL COMMENT "",
`hash` varchar(100) NOT NULL COMMENT "",
`index` bigint(20) NOT NULL COMMENT "",
`internal_index` varchar(50) NOT NULL COMMENT "",
`block_timestamp` datetime NOT NULL COMMENT "",
`asset_address` varchar(100) NOT NULL COMMENT "",
`flow_type` varchar(10) NOT NULL COMMENT "",
`block_number` bigint(20) NOT NULL COMMENT "",
`interact_address` varchar(100) NULL COMMENT "",
`nft_token_id` varchar(100) NULL COMMENT "",
`amount_raw` varchar(100) NULL COMMENT "",
INDEX asset_address (`asset_address`) USING BITMAP COMMENT ''
) ENGINE=OLAP
UNIQUE KEY(`address`, `hash`, `index`, `internal_index`, `block_timestamp`, `asset_address`, `flow_type`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`address`) BUCKETS 3000
PROPERTIES (
"replication_num" = "3",
"colocate_with" = "money_flow_group",
"in_memory" = "false",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"compression" = "ZSTD"
);
物化视图dml
CREATE MATERIALIZED VIEW `ethereum_gamefi_protocol_latest_stats__mv` (`chain`,
`protocol_slug`,
`protocol_name`,
`logo`,
`volume_1h`,
`volume_24h`,
`volume_7d`,
`volume_30d`,
`number_of_transactions_1h`,
`number_of_active_users_1h`,
`number_of_transactions_24h`,
`number_of_active_users_24h`,
`number_of_transactions_7d`,
`number_of_active_users_7d`,
`number_of_transactions_30d`,
`number_of_active_users_30d`,
`updated_at`)
DISTRIBUTED BY HASH(`protocol_slug`)
REFRESH ASYNC START("2023-12-07 00:00:00") EVERY(INTERVAL 1 HOUR)
PROPERTIES (
"replication_num" = "3",
"replicated_storage" = "true",
"storage_medium" = "HDD"
)
AS WITH `protocol_list` (`chain`,
`protocol_slug`,
`protocol_name`,
`logo`) AS (
SELECT
`prod_silver`.`protocol_info`.`chain`,
`prod_silver`.`protocol_info`.`protocol_slug`,
`prod_silver`.`protocol_info`.`protocol_name`,
`prod_silver`.`protocol_info`.`logo`
FROM
`prod_silver`.`protocol_info`
WHERE
(`prod_silver`.`protocol_info`.`chain` = 'Ethereum')
AND (`prod_silver`.`protocol_info`.`protocol_type` = 'GameFi')
GROUP BY
1,
2,
3,
4) ,
`contract_info` (`contract_address`,
`protocol_slug`,
`protocol_name`,
`chain`) AS (
SELECT
`prod_silver`.`ci`.`contract_address`,
`pl`.`protocol_slug`,
`pl`.`protocol_name`,
`prod_silver`.`ci`.`chain`
FROM
`prod_silver`.`contract_info__mv` AS `ci`
INNER JOIN `protocol_list` AS pl ON
`prod_silver`.`ci`.`protocol_slug` = `pl`.`protocol_slug`
WHERE
`prod_silver`.`ci`.`chain` = 'Ethereum') ,
`filter_method` (`method_id`) AS (
SELECT
'0x23b872dd' AS `method_id`
UNION ALL
SELECT
'0x095ea7b3' AS `method_id`
UNION ALL
SELECT
'0xa9059cbb' AS `method_id`) ,
`source_asset_flow` (`address`,
`hash`,
`interact_address`,
`protocol_slug`,
`asset_address`,
`block_timestamp`,
`amount_raw`) AS (
SELECT
`api`.`a`.`address`,
`api`.`a`.`hash`,
`api`.`a`.`interact_address`,
`ci`.`protocol_slug`,
`api`.`a`.`asset_address`,
`api`.`a`.`block_timestamp`,
CAST(`api`.`a`.`amount_raw` AS DOUBLE) AS `amount_raw`
FROM
`api`.`ethereum_asset_flow` AS `a`
INNER JOIN `contract_info` AS ci ON
`api`.`a`.`address` = `ci`.`contract_address`
WHERE
(`api`.`a`.`block_timestamp` >= (date_sub(current_timestamp(), INTERVAL 30 DAY)))
AND (`api`.`a`.`flow_type` = 'inflow')) ,
`source_interactions` (`address`,
`hash`,
`interact_address`,
`protocol_slug`,
`block_timestamp`) AS (
SELECT
`api`.`bi`.`address`,
`api`.`bi`.`hash`,
`api`.`bi`.`interact_address`,
`ci`.`protocol_slug`,
`api`.`bi`.`block_timestamp`
FROM
`api`.`ethereum_interactions` AS `bi`
INNER JOIN `contract_info` AS ci ON
`api`.`bi`.`address` = `ci`.`contract_address`
WHERE
(`api`.`bi`.`block_timestamp` >= (date_sub(current_timestamp(), INTERVAL 30 DAY)))
AND (`api`.`bi`.`contract_method_id` NOT IN (((((((((((((((((((
SELECT
`filter_method`.`method_id`
FROM
`filter_method`))))))))))))))))))))) ,
`token_info` (`token_address`,
`decimals`) AS (
SELECT
`prod_silver`.`token_chain_info`.`token_address`,
`prod_silver`.`token_chain_info`.`decimals`
FROM
`prod_silver`.`token_chain_info`
WHERE
`prod_silver`.`token_chain_info`.`chain` = 'Ethereum'
UNION
SELECT
`prod_silver`.`token_info`.`token_address`,
`prod_silver`.`token_info`.`decimals`
FROM
`prod_silver`.`token_info`
WHERE
(`prod_silver`.`token_info`.`chain` = 'Ethereum')
AND (`prod_silver`.`token_info`.`token_address` = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee')) ,
`token_price` (`timestamp`,
`token_address`,
`price`) AS (
SELECT
`prod_silver`.`token_price_5min`.`timestamp`,
`prod_silver`.`token_price_5min`.`token_address`,
`prod_silver`.`token_price_5min`.`price`
FROM
`prod_silver`.`token_price_5min`
WHERE
(`prod_silver`.`token_price_5min`.`chain` = 'Ethereum')
AND (`prod_silver`.`token_price_5min`.`timestamp` >= (date_sub(current_timestamp(), INTERVAL 30 DAY)))) ,
`asset_flow_with_token_info` (`address`,
`interact_address`,
`protocol_slug`,
`block_timestamp`,
`asset_address`,
`amount`) AS (
SELECT
`af`.`address`,
`af`.`interact_address`,
`af`.`protocol_slug`,
`af`.`block_timestamp`,
`af`.`asset_address`,
`af`.`amount_raw` / (power(10, `ti`.`decimals`)) AS `amount`
FROM
`source_asset_flow` AS af
LEFT OUTER JOIN `token_info` AS ti ON
`ti`.`token_address` = `af`.`asset_address`) ,
`asset_flow_with_price` (`address`,
`interact_address`,
`protocol_slug`,
`block_timestamp`,
`value`) AS (
SELECT
`af`.`address`,
`af`.`interact_address`,
`af`.`protocol_slug`,
`af`.`block_timestamp`,
`af`.`amount` * `tp`.`price` AS `value`
FROM
`asset_flow_with_token_info` AS af
LEFT OUTER JOIN `token_price` AS tp ON
(`tp`.`token_address` = `af`.`asset_address`)
AND (`tp`.`timestamp` = (from_unixtime((floor((unix_timestamp(`af`.`block_timestamp`)) / 300)) * 300)))) ,
`one_hour_volume_stats` (`protocol_slug`,
`volume_1h`) AS (
SELECT
`asset_flow_with_price`.`protocol_slug`,
sum(`asset_flow_with_price`.`value`) AS `volume_1h`
FROM
`asset_flow_with_price`
WHERE
`asset_flow_with_price`.`block_timestamp` >= (hours_sub(current_timestamp(),
'1'))
GROUP BY
1) ,
`one_day_volume_stats` (`protocol_slug`,
`volume_24h`) AS (
SELECT
`asset_flow_with_price`.`protocol_slug`,
sum(`asset_flow_with_price`.`value`) AS `volume_24h`
FROM
`asset_flow_with_price`
WHERE
`asset_flow_with_price`.`block_timestamp` >= (hours_sub(current_timestamp(),
'24'))
GROUP BY
1) ,
`seven_day_volume_stats` (`protocol_slug`,
`volume_7d`) AS (
SELECT
`asset_flow_with_price`.`protocol_slug`,
sum(`asset_flow_with_price`.`value`) AS `volume_7d`
FROM
`asset_flow_with_price`
WHERE
`asset_flow_with_price`.`block_timestamp` >= (date_sub(current_date(), INTERVAL 7 DAY))
GROUP BY
1) ,
`one_month_volume_stats` (`protocol_slug`,
`volume_30d`) AS (
SELECT
`asset_flow_with_price`.`protocol_slug`,
sum(`asset_flow_with_price`.`value`) AS `volume_30d`
FROM
`asset_flow_with_price`
WHERE
`asset_flow_with_price`.`block_timestamp` >= (date_sub(current_date(), INTERVAL 30 DAY))
GROUP BY
1) ,
`one_hour_interactions_stats` (`protocol_slug`,
`number_of_transactions_1h`,
`number_of_active_users_1h`) AS (
SELECT
`source_interactions`.`protocol_slug`,
count(DISTINCT `source_interactions`.`hash`) AS `number_of_transactions_1h`,
count(DISTINCT `source_interactions`.`interact_address`) AS `number_of_active_users_1h`
FROM
`source_interactions`
WHERE
`source_interactions`.`block_timestamp` >= (hours_sub(current_timestamp(),
'1'))
GROUP BY
1) ,
`one_day_interactions_stats` (`protocol_slug`,
`number_of_transactions_24h`,
`number_of_active_users_24h`) AS (
SELECT
`source_interactions`.`protocol_slug`,
count(DISTINCT `source_interactions`.`hash`) AS `number_of_transactions_24h`,
count(DISTINCT `source_interactions`.`interact_address`) AS `number_of_active_users_24h`
FROM
`source_interactions`
WHERE
`source_interactions`.`block_timestamp` >= (hours_sub(current_timestamp(),
'24'))
GROUP BY
1) ,
`seven_day_interactions_stats` (`protocol_slug`,
`number_of_transactions_7d`,
`number_of_active_users_7d`) AS (
SELECT
`source_interactions`.`protocol_slug`,
count(DISTINCT `source_interactions`.`hash`) AS `number_of_transactions_7d`,
count(DISTINCT `source_interactions`.`interact_address`) AS `number_of_active_users_7d`
FROM
`source_interactions`
WHERE
`source_interactions`.`block_timestamp` >= (date_sub(current_date(), INTERVAL 7 DAY))
GROUP BY
1) ,
`on_month_interactions_stats` (`protocol_slug`,
`number_of_transactions_30d`,
`number_of_active_users_30d`) AS (
SELECT
`source_interactions`.`protocol_slug`,
count(DISTINCT `source_interactions`.`hash`) AS `number_of_transactions_30d`,
count(DISTINCT `source_interactions`.`interact_address`) AS `number_of_active_users_30d`
FROM
`source_interactions`
WHERE
`source_interactions`.`block_timestamp` >= (date_sub(current_date(), INTERVAL 30 DAY))
GROUP BY
1)
SELECT
`a`.`chain`,
`a`.`protocol_slug`,
`a`.`protocol_name`,
`a`.`logo`,
coalesce(`b`.`volume_1h`, 0) AS `volume_1h`,
coalesce(`c`.`volume_24h`, 0) AS `volume_24h`,
coalesce(`d`.`volume_7d`, 0) AS `volume_7d`,
coalesce(`e`.`volume_30d`, 0) AS `volume_30d`,
coalesce(`f`.`number_of_transactions_1h`, 0) AS `number_of_transactions_1h`,
coalesce(`f`.`number_of_active_users_1h`, 0) AS `number_of_active_users_1h`,
coalesce(`g`.`number_of_transactions_24h`, 0) AS `number_of_transactions_24h`,
coalesce(`g`.`number_of_active_users_24h`, 0) AS `number_of_active_users_24h`,
coalesce(`h`.`number_of_transactions_7d`, 0) AS `number_of_transactions_7d`,
coalesce(`h`.`number_of_active_users_7d`, 0) AS `number_of_active_users_7d`,
coalesce(`i`.`number_of_transactions_30d`, 0) AS `number_of_transactions_30d`,
coalesce(`i`.`number_of_active_users_30d`, 0) AS `number_of_active_users_30d`,
current_timestamp() AS `updated_at`
FROM
`protocol_list` AS a
LEFT OUTER JOIN `one_hour_volume_stats` AS b ON
`a`.`protocol_slug` = `b`.`protocol_slug`
LEFT OUTER JOIN `one_day_volume_stats` AS c ON
`a`.`protocol_slug` = `c`.`protocol_slug`
LEFT OUTER JOIN `seven_day_volume_stats` AS d ON
`a`.`protocol_slug` = `d`.`protocol_slug`
LEFT OUTER JOIN `one_month_volume_stats` AS e ON
`a`.`protocol_slug` = `e`.`protocol_slug`
LEFT OUTER JOIN `one_hour_interactions_stats` AS f ON
`a`.`protocol_slug` = `f`.`protocol_slug`
LEFT OUTER JOIN `one_day_interactions_stats` AS g ON
`a`.`protocol_slug` = `g`.`protocol_slug`
LEFT OUTER JOIN `seven_day_interactions_stats` AS h ON
`a`.`protocol_slug` = `h`.`protocol_slug`
LEFT OUTER JOIN `on_month_interactions_stats` AS i ON
`a`.`protocol_slug` = `i`.`protocol_slug`;