@@ -9,7 +9,7 @@ use tokio::task;
99use tokio:: task:: JoinSet ;
1010
1111use crate :: connection:: program:: Program ;
12- use crate :: connection:: MakeConnection ;
12+ use crate :: connection:: { Connection , MakeConnection } ;
1313use crate :: database:: PrimaryConnectionMaker ;
1414use crate :: namespace:: meta_store:: { MetaStore , MetaStoreConnection } ;
1515use crate :: namespace:: { NamespaceName , NamespaceStore } ;
@@ -554,7 +554,15 @@ enum WorkResult {
554554 } ,
555555}
556556
557- async fn backup_meta_store ( meta : & MetaStore ) -> Result < ( ) , Error > {
557+ async fn backup_meta_store (
558+ meta : & MetaStore ,
559+ migration_db : Arc < Mutex < MetaStoreConnection > > ,
560+ ) -> Result < ( ) , Error > {
561+ with_conn_async ( migration_db, |conn| {
562+ Ok ( conn. query_row ( "PRAGMA wal_checkpoint(truncate)" , ( ) , |_| Ok ( ( ) ) ) ?)
563+ } )
564+ . await ?;
565+
558566 if let Some ( mut savepoint) = meta. backup_savepoint ( ) {
559567 if let Err ( e) = savepoint. confirmed ( ) . await {
560568 tracing:: error!( "failed to backup meta store: {e}" ) ;
@@ -572,11 +580,23 @@ async fn backup_meta_store(meta: &MetaStore) -> Result<(), Error> {
572580}
573581
574582async fn backup_namespace ( store : & NamespaceStore , ns : NamespaceName ) -> Result < ( ) , Error > {
575- let savepoint = store
576- . with ( ns. clone ( ) , |ns| ns. db . backup_savepoint ( ) )
583+ let ( savepoint, conn_maker) = store
584+ . with ( ns. clone ( ) , |ns| {
585+ let sp = ns. db . backup_savepoint ( ) ;
586+ let conn_maker = ns. db . connection_maker ( ) ;
587+ ( sp, conn_maker)
588+ } )
577589 . await
578590 . map_err ( |e| Error :: NamespaceLoad ( Box :: new ( e) ) ) ?;
579591
592+ conn_maker
593+ . create ( )
594+ . await
595+ . map_err ( |e| Error :: NamespaceBackupFailure ( ns. clone ( ) , e. into ( ) ) ) ?
596+ . checkpoint ( )
597+ . await
598+ . map_err ( |e| Error :: NamespaceBackupFailure ( ns. clone ( ) , e. into ( ) ) ) ?;
599+
580600 if let Some ( mut savepoint) = savepoint {
581601 if let Err ( e) = savepoint. confirmed ( ) . await {
582602 return Err ( Error :: NamespaceBackupFailure ( ns, e. into ( ) ) ) ;
@@ -608,14 +628,14 @@ async fn step_job_failure(
608628 namespace_store : NamespaceStore ,
609629) -> WorkResult {
610630 try_step_job ( MigrationJobStatus :: DryRunFailure , async move {
611- with_conn_async ( migration_db, move |conn| {
631+ with_conn_async ( migration_db. clone ( ) , move |conn| {
612632 // TODO ensure here that this transition is valid
613633 // the error must already be there from when we stepped to DryRunFailure
614634 update_job_status ( conn, job_id, MigrationJobStatus :: RunFailure , None )
615635 } )
616636 . await ?;
617637
618- backup_meta_store ( namespace_store. meta_store ( ) ) . await ?;
638+ backup_meta_store ( namespace_store. meta_store ( ) , migration_db ) . await ?;
619639
620640 Ok ( MigrationJobStatus :: RunFailure )
621641 } )
@@ -629,13 +649,13 @@ async fn step_job_waiting_run(
629649 namespace_store : NamespaceStore ,
630650) -> WorkResult {
631651 try_step_job ( MigrationJobStatus :: DryRunSuccess , async move {
632- with_conn_async ( migration_db, move |conn| {
652+ with_conn_async ( migration_db. clone ( ) , move |conn| {
633653 // TODO ensure here that this transition is valid
634654 update_job_status ( conn, job_id, MigrationJobStatus :: WaitingRun , None )
635655 } )
636656 . await ?;
637657
638- backup_meta_store ( namespace_store. meta_store ( ) ) . await ?;
658+ backup_meta_store ( namespace_store. meta_store ( ) , migration_db ) . await ?;
639659
640660 Ok ( MigrationJobStatus :: WaitingRun )
641661 } )
@@ -651,7 +671,7 @@ async fn step_job_dry_run_failure(
651671 ( task_id, error, ns) : ( i64 , String , NamespaceName ) ,
652672) -> WorkResult {
653673 try_step_job ( status, async move {
654- with_conn_async ( migration_db, move |conn| {
674+ with_conn_async ( migration_db. clone ( ) , move |conn| {
655675 let error = format ! ( "task {task_id} for namespace `{ns}` failed with error: {error}" ) ;
656676 update_job_status (
657677 conn,
@@ -662,7 +682,7 @@ async fn step_job_dry_run_failure(
662682 } )
663683 . await ?;
664684
665- backup_meta_store ( namespace_store. meta_store ( ) ) . await ?;
685+ backup_meta_store ( namespace_store. meta_store ( ) , migration_db ) . await ?;
666686 Ok ( MigrationJobStatus :: DryRunFailure )
667687 } )
668688 . await
@@ -675,12 +695,12 @@ async fn step_job_dry_run_success(
675695 namespace_store : NamespaceStore ,
676696) -> WorkResult {
677697 try_step_job ( MigrationJobStatus :: WaitingDryRun , async move {
678- with_conn_async ( migration_db, move |conn| {
698+ with_conn_async ( migration_db. clone ( ) , move |conn| {
679699 job_step_dry_run_success ( conn, job_id)
680700 } )
681701 . await ?;
682702
683- backup_meta_store ( namespace_store. meta_store ( ) ) . await ?;
703+ backup_meta_store ( namespace_store. meta_store ( ) , migration_db ) . await ?;
684704
685705 Ok ( MigrationJobStatus :: DryRunSuccess )
686706 } )
@@ -744,14 +764,12 @@ async fn step_job_run_success(
744764 // backup the schema
745765 backup_namespace ( & namespace_store, schema) . await ?;
746766
747- tokio:: task:: spawn_blocking ( move || {
748- let mut conn = migration_db. lock ( ) ;
749- update_job_status ( & mut conn, job_id, MigrationJobStatus :: RunSuccess , None )
767+ with_conn_async ( migration_db. clone ( ) , move |conn| {
768+ update_job_status ( conn, job_id, MigrationJobStatus :: RunSuccess , None )
750769 } )
751- . await
752- . expect ( "task panicked" ) ?;
770+ . await ?;
753771
754- backup_meta_store ( namespace_store. meta_store ( ) ) . await ?;
772+ backup_meta_store ( namespace_store. meta_store ( ) , migration_db ) . await ?;
755773 Ok ( MigrationJobStatus :: RunSuccess )
756774 } )
757775 . await
0 commit comments