{"id":57493,"date":"2023-06-27T08:33:13","date_gmt":"2023-06-27T03:03:13","guid":{"rendered":"https:\/\/www.tothenew.com\/blog\/?p=57493"},"modified":"2023-06-27T08:33:13","modified_gmt":"2023-06-27T03:03:13","slug":"no-code-data-ingestion-framework-using-apache-flink","status":"publish","type":"post","link":"https:\/\/www.tothenew.com\/blog\/no-code-data-ingestion-framework-using-apache-flink\/","title":{"rendered":"No Code Data Ingestion Framework Using Apache-Flink\u00a0"},"content":{"rendered":"<p><span style=\"font-weight: 400;\">The conveyance of data from many sources to a storage medium where it may be accessed, utilized, and analyzed by an organization is known as data ingestion. Typically, the destination is a data warehouse, data mart, database, or document storage. Sources can include RDBMS such as MySQL, Oracle, and Postgres. The data ingestion layer serves as the foundation for any data engineering framework.<\/span><\/p>\n<p><span style=\"font-weight: 400;\">\u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0<img decoding=\"async\" loading=\"lazy\" class=\"alignnone wp-image-57488\" src=\"\/blog\/wp-ttn-blog\/uploads\/2023\/05\/Screenshot-from-2023-05-17-12-00-17-300x102.png\" alt=\"\" width=\"421\" height=\"143\" srcset=\"\/blog\/wp-ttn-blog\/uploads\/2023\/05\/Screenshot-from-2023-05-17-12-00-17-300x102.png 300w, \/blog\/wp-ttn-blog\/uploads\/2023\/05\/Screenshot-from-2023-05-17-12-00-17-624x213.png 624w, \/blog\/wp-ttn-blog\/uploads\/2023\/05\/Screenshot-from-2023-05-17-12-00-17.png 754w\" sizes=\"(max-width: 421px) 100vw, 421px\" \/><\/span><\/p>\n<h3><b>What is Apache Flink?<\/b><\/h3>\n<p><span style=\"font-weight: 400;\">Apache Flink is a free and open-source distributed stream processing framework that may be used for real-time analytics and massive dataset processing. It was created to handle batch and stream processing cohesively, making it an excellent solution for use cases that require both. Flink accepts data from various sources, including event streams, databases, and file systems. It includes filters, maps, joins, and aggregations among its data transformation and analysis operations. One of Flink&#8217;s primary strengths is its capacity to process streaming data in real-time, which means it can manage large amounts of data with minimal delay. It also enables fault tolerance, ensuring that processing can continue even if the system fails.<\/span><\/p>\n<h3><b>How are we using Flink? \u00a0 <\/b><span style=\"font-weight: 400;\">\u00a0<\/span><span style=\"font-weight: 400;\">\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0<\/span><\/h3>\n<p><span style=\"font-weight: 400;\">\u00a0 \u00a0 \u00a0 <img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-57489\" src=\"\/blog\/wp-ttn-blog\/uploads\/2023\/05\/Screenshot-from-2023-05-26-12-02-31.png\" alt=\"\" width=\"161\" height=\"64\" \/>\u00a0 <\/span><span style=\"font-weight: 400;\">X<\/span><span style=\"font-weight: 400;\">\u00a0 \u00a0 <img decoding=\"async\" loading=\"lazy\" class=\"alignnone wp-image-57490\" src=\"\/blog\/wp-ttn-blog\/uploads\/2023\/05\/Screenshot-from-2023-05-26-12-03-57.png\" alt=\"\" width=\"244\" height=\"59\" \/> \u00a0 <\/span><span style=\"font-weight: 400;\">\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0<\/span><\/p>\n<p><span style=\"font-weight: 400;\">TO THE NEW has automated the data ingestion process using PyFlink. We wrote a Python wrapper on the top of the SQL code, which automates the process of ingestion, where the user only needs to pass the JSON file with the appropriate details that establish the connection between the source and destination so that ingestion can occur. Our framework offers various features such as reading data from RDBMS sources using PyFlink and writing it to S3, HDFS using the Flink tool, and incremental load.<\/span><\/p>\n<h2><b>Basic Features<\/b><\/h2>\n<p><span style=\"font-weight: 400;\">Reading data from RDBMS sources using PyFlink and writing it to S3 , HDFS.<\/span><\/p>\n<ul>\n<li style=\"font-weight: 400;\"><span style=\"font-weight: 400;\">Users can utilize Nimbus-Flink to ingest data from many sources into distinct destinations.<\/span><\/li>\n<li style=\"font-weight: 400;\"><span style=\"font-weight: 400;\">\u00a0There is no need to write any script or code.<\/span><\/li>\n<li style=\"font-weight: 400;\"><span style=\"font-weight: 400;\">The user does not need to care about any complex configurations.<\/span><\/li>\n<li style=\"font-weight: 400;\"><span style=\"font-weight: 400;\">Nimbus-Flink will handle the rest if users give the source and destination details in easily modifiable JSON files.<\/span><\/li>\n<\/ul>\n<p><b>Support Options:<\/b><\/p>\n<table>\n<tbody>\n<tr>\n<td><b>Supported Sources:<\/b><\/td>\n<td><b>Supported Destinations:<\/b><\/td>\n<\/tr>\n<tr>\n<td><span style=\"font-weight: 400;\">MySql<\/span><\/td>\n<td><span style=\"font-weight: 400;\">AWS S3 Bucket, HDFS<\/span><\/td>\n<\/tr>\n<tr>\n<td><span style=\"font-weight: 400;\">Oracle<\/span><\/td>\n<td><span style=\"font-weight: 400;\">AWS S3 Bucket, HDFS<\/span><\/td>\n<\/tr>\n<tr>\n<td><span style=\"font-weight: 400;\">Postgres<\/span><\/td>\n<td><span style=\"font-weight: 400;\">AWS S3 Bucket, HDFS<\/span><\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<p>&nbsp;<\/p>\n<p><img decoding=\"async\" loading=\"lazy\" class=\"alignleft wp-image-57491 size-medium\" src=\"\/blog\/wp-ttn-blog\/uploads\/2023\/05\/flow_diagram-300x183.png\" alt=\"\" width=\"300\" height=\"183\" srcset=\"\/blog\/wp-ttn-blog\/uploads\/2023\/05\/flow_diagram-300x183.png 300w, \/blog\/wp-ttn-blog\/uploads\/2023\/05\/flow_diagram-768x467.png 768w, \/blog\/wp-ttn-blog\/uploads\/2023\/05\/flow_diagram-624x380.png 624w, \/blog\/wp-ttn-blog\/uploads\/2023\/05\/flow_diagram.png 884w\" sizes=\"(max-width: 300px) 100vw, 300px\" \/><\/p>\n<p>&nbsp;<\/p>\n<p><b>How to use Nimbus-Flink?<\/b><\/p>\n<h1><b>Getting Started:<\/b><\/h1>\n<p><b>SETUP<\/b><\/p>\n<p><span style=\"text-decoration: underline;\"><b>Prerequisite<\/b><\/span><\/p>\n<p><span style=\"font-weight: 400;\">Java 8 or 11 (<\/span><span style=\"font-weight: 400;\">sudo apt-get install openjdk-8-jdk)<\/span><\/p>\n<p><span style=\"font-weight: 400;\">Pyflink 1.17.0<\/span><\/p>\n<p><span style=\"font-weight: 400;\">For mysql<\/span><\/p>\n<p><a href=\"https:\/\/mvnrepository.com\/artifact\/org.apache.flink\/flink-connector-jdbc\"><span style=\"font-weight: 400;\">JDBC connector jar<\/span><\/a><span style=\"font-weight: 400;\">\u00a0<\/span><\/p>\n<p><span style=\"font-weight: 400;\">The JDBC connector jar should be within the \u201clib\u201d folder of pyflink 1.15.3 or upper version.<\/span><\/p>\n<p><a href=\"https:\/\/repo1.maven.org\/maven2\/com\/mysql\/mysql-connector-j\/8.0.31\/\"><span style=\"font-weight: 400;\">MySql connector<\/span><\/a><span style=\"font-weight: 400;\"> jar\u00a0<\/span><\/p>\n<p><span style=\"font-weight: 400;\">Mysql connector jar should also be within the \u201clib\u201d folder of pyflink 1.15.3 or upper version.<\/span><\/p>\n<p><a href=\"https:\/\/mvnrepository.com\/artifact\/mysql\/mysql-connector-java\/6.0.3\"><span style=\"font-weight: 400;\">Mysql-java-connector <\/span><\/a><span style=\"font-weight: 400;\">jar\u00a0<\/span><\/p>\n<p><span style=\"font-weight: 400;\">This jar should be residing within the lib folder of Pyflink<\/span><\/p>\n<p><a href=\"https:\/\/mvnrepository.com\/artifact\/com.ververica\/flink-sql-connector-sqlserver-cdc\"><span style=\"font-weight: 400;\">Flink-sql-connector-sql-server<\/span><\/a><span style=\"font-weight: 400;\"> jar<\/span><\/p>\n<p><span style=\"font-weight: 400;\">Flink-sql-connector-sql-server required in pyflink\u2019s lib\u2019s folder.<\/span><\/p>\n<p><span style=\"text-decoration: underline;\"><strong>For Oracle<\/strong><\/span><\/p>\n<p><span style=\"font-weight: 400;\">Connector :: <\/span><a href=\"https:\/\/mvnrepository.com\/artifact\/com.oracle.database.jdbc\/ojdbc8\/21.9.0.0\"><span style=\"font-weight: 400;\">OJDBC<\/span><\/a><span style=\"font-weight: 400;\"> (This jar file should be in lib folder of PyFlink)<\/span><\/p>\n<p><span style=\"text-decoration: underline;\"><strong>For Postgresql<\/strong><\/span><\/p>\n<p><span style=\"font-weight: 400;\">Connector :: <\/span><a href=\"https:\/\/mvnrepository.com\/artifact\/org.apache.flink\/flink-connector-jdbc\"><span style=\"font-weight: 400;\">JDBC<\/span><\/a><span style=\"font-weight: 400;\">\u00a0<\/span><\/p>\n<p><span style=\"font-weight: 400;\">Jar should be in the lib folder of pyflink :: <\/span><a href=\"https:\/\/mvnrepository.com\/artifact\/org.postgresql\/postgresql\/42.5.4\"><span style=\"font-weight: 400;\">postgres-sql-jar<\/span><\/a><span style=\"font-weight: 400;\"> ((\u00a0 (In \u00a0 (lib fo<\/span><\/p>\n<p><span style=\"text-decoration: underline;\"><strong>For the S3 part<\/strong><\/span><\/p>\n<p><a href=\"https:\/\/mvnrepository.com\/artifact\/org.apache.flink\/flink-s3-fs-presto\"><span style=\"font-weight: 400;\">s3-fs-presto jar<\/span><\/a><span style=\"font-weight: 400;\">\u00a0<\/span><\/p>\n<p><span style=\"font-weight: 400;\">s3-fs-presto jar resides within the lib folder of pyflink.<\/span><\/p>\n<p><span style=\"font-weight: 400;\">s3-fs-presto jar resides within its own named folder within the plugins folder of pyflink<br \/>\n<\/span><\/p>\n<p><b>NOTE:<\/b> <span style=\"font-weight: 400;\">S3 bucket name should be in the region in which your account exists both(account and bucket) should be in the same region.<\/span><\/p>\n<p><strong>Mandatory Steps<\/strong><\/p>\n<p><span style=\"font-weight: 400;\">Go to the cd pyflink-1.15.3\/bin\/conf &#8211; &#8211; &#8211; -&gt; open conf.yaml file present in that folder.<br \/>\n<\/span><\/p>\n<p><b>AWS ACCESS KEY AND AWS SECRET KEY:<\/b><\/p>\n<p><span style=\"font-weight: 400;\">#S3_bucket connection details:<\/span><\/p>\n<p><span style=\"font-weight: 400;\">Fs.s3.awsaccesskey:\u00a0 &#8220;&lt;Key&gt;&#8221;<\/span><\/p>\n<p><span style=\"font-weight: 400;\">Fs.s3.awssecretkey:\u00a0 &#8220;&lt;Key&gt;&#8221;<\/span><\/p>\n<p><span style=\"font-weight: 400;\"><br \/>\n<\/span><strong>For Hadoop:<\/strong><\/p>\n<p><span style=\"font-weight: 400;\">Required jar should be in lib folder of pyflink:<\/span><\/p>\n<p><a href=\"https:\/\/repo.maven.apache.org\/maven2\/org\/apache\/flink\/flink-shaded-hadoop-2-uber\/2.4.1-10.0\/\"><span style=\"font-weight: 400;\">Hadoop-shaded-jar<\/span><\/a><\/p>\n<p><b>Important points to consider before configuring:<\/b><\/p>\n<ul>\n<li style=\"font-weight: 400;\"><span style=\"font-weight: 400;\">Path for hdfs hdfs:\/\/host:port\/target-directory-name\/.<\/span><\/li>\n<li style=\"font-weight: 400;\"><span style=\"font-weight: 400;\">Target directory must be present in the hdfs.<\/span><\/li>\n<li style=\"font-weight: 400;\"><span style=\"font-weight: 400;\">Target directory must have all the permissions.<\/span><\/li>\n<li style=\"font-weight: 400;\"><span style=\"font-weight: 400;\">While Ingesting data to the s3 bucket, ensure that your account and s3 bucket are in the same region.<\/span><\/li>\n<li style=\"font-weight: 400;\"><span style=\"font-weight: 400;\">Ingestion using a nimbus flink will work on SCD type1 model.<\/span><\/li>\n<\/ul>\n<p><b>Configuration<\/b><\/p>\n<p><span style=\"font-weight: 400;\">\u00a0Create a <\/span><span style=\"font-weight: 400;\">mysql_hdfs_config.json<\/span><\/p>\n<p><img decoding=\"async\" loading=\"lazy\" class=\"alignnone wp-image-57492 size-medium\" src=\"\/blog\/wp-ttn-blog\/uploads\/2023\/05\/Screenshot-from-2023-05-10-16-21-46-257x300.png\" alt=\"\" width=\"257\" height=\"300\" srcset=\"\/blog\/wp-ttn-blog\/uploads\/2023\/05\/Screenshot-from-2023-05-10-16-21-46-257x300.png 257w, \/blog\/wp-ttn-blog\/uploads\/2023\/05\/Screenshot-from-2023-05-10-16-21-46.png 375w\" sizes=\"(max-width: 257px) 100vw, 257px\" \/><\/p>\n<p><span style=\"font-weight: 400;\">Same as for other sources and destinations, we must create a JSON file with the appropriate details.<\/span><\/p>\n<p><b>Running Nimbus-Flink:<\/b><\/p>\n<p><span style=\"font-weight: 400;\">Python main.py &lt;path_to_json_file&gt;<\/span><\/p>\n<p><b>Conclusion:<\/b><\/p>\n<p><span style=\"font-weight: 400;\">Apache Flink is a powerful real-time data processing tool that has changed how we handle data for many organizations. Its ability to perform complex computations on large data streams has enabled us to build and scale our real-time systems easily.\u00a0<\/span><\/p>\n<p><span style=\"font-weight: 400;\">Nimbus-flink can perform ingestion faster, and when you are using the nimbus-flink user, do not worry about any complex coding. Users just need to write a simple json file to perform the ingestion.<\/span><\/p>\n<p><span style=\"font-weight: 400;\">If you have any further questions, you can comment on the blog. You can also refer to our open-source project:<\/span><span style=\"font-weight: 400;\"> https:\/\/github.com\/tothenew\/nimbus-flink<\/span><\/p>\n<div class=\"ap-custom-wrapper\"><\/div><!--ap-custom-wrapper-->","protected":false},"excerpt":{"rendered":"<p>The conveyance of data from many sources to a storage medium where it may be accessed, utilized, and analyzed by an organization is known as data ingestion. Typically, the destination is a data warehouse, data mart, database, or document storage. Sources can include RDBMS such as MySQL, Oracle, and Postgres. The data ingestion layer serves [&hellip;]<\/p>\n","protected":false},"author":1592,"featured_media":0,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"iawp_total_views":64},"categories":[1395],"tags":[5265,1853,248,1197,5266,5269,1398,1516,76,5267,1114,5268],"aioseo_notices":[],"_links":{"self":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts\/57493"}],"collection":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/users\/1592"}],"replies":[{"embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/comments?post=57493"}],"version-history":[{"count":7,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts\/57493\/revisions"}],"predecessor-version":[{"id":57676,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts\/57493\/revisions\/57676"}],"wp:attachment":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/media?parent=57493"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/categories?post=57493"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/tags?post=57493"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}