Skip to content

Commit d6ebac5

Browse files
committed
remove working file and upload directly
1 parent b15b5ec commit d6ebac5

File tree

2 files changed

+20
-73
lines changed

2 files changed

+20
-73
lines changed

src/main/java/org/embulk/output/sftp/SftpFileOutput.java

Lines changed: 20 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,14 @@ public class SftpFileOutput
3535
private final String userInfo;
3636
private final String host;
3737
private final int port;
38-
private final String workingFileScheme;
3938
private final String pathPrefix;
4039
private final String sequenceFormat;
4140
private final String fileNameExtension;
4241

4342
private final int taskIndex;
4443
private int fileIndex = 0;
45-
private FileObject currentWorkingFile;
46-
private OutputStream currentWorkingFileOutputStream;
44+
private FileObject currentFile;
45+
private OutputStream currentFileOutputStream;
4746

4847
private StandardFileSystemManager initializeStandardFileSystemManager()
4948
{
@@ -100,7 +99,6 @@ private FileSystemOptions initializeFsOptions(PluginTask task)
10099
this.host = task.getHost();
101100
this.port = task.getPort();
102101
this.pathPrefix = task.getPathPrefix();
103-
this.workingFileScheme = task.getWorkingFileScheme();
104102
this.sequenceFormat = task.getSequenceFormat();
105103
this.fileNameExtension = task.getFileNameExtension();
106104
this.taskIndex = taskIndex;
@@ -109,12 +107,12 @@ private FileSystemOptions initializeFsOptions(PluginTask task)
109107
@Override
110108
public void nextFile()
111109
{
112-
closeCurrentWithUpload();
110+
closeCurrentFile();
113111

114112
try {
115-
currentWorkingFile = newWorkingFile(getWorkingFileUri(getOutputFilePath()));
116-
currentWorkingFileOutputStream = currentWorkingFile.getContent().getOutputStream();
117-
logger.info("new working file: {}", currentWorkingFile.getPublicURIString());
113+
currentFile = newSftpFile(getSftpFileUri(getOutputFilePath()));
114+
currentFileOutputStream = currentFile.getContent().getOutputStream();
115+
logger.info("new sftp file: {}", currentFile.getPublicURIString());
118116
}
119117
catch (FileSystemException e) {
120118
logger.error(e.getMessage());
@@ -129,12 +127,12 @@ public void nextFile()
129127
@Override
130128
public void add(Buffer buffer)
131129
{
132-
if (currentWorkingFile == null) {
130+
if (currentFile == null) {
133131
throw new IllegalStateException("nextFile() must be called before poll()");
134132
}
135133

136134
try {
137-
currentWorkingFileOutputStream.write(buffer.array(), buffer.offset(), buffer.limit());
135+
currentFileOutputStream.write(buffer.array(), buffer.offset(), buffer.limit());
138136
}
139137
catch (IOException e) {
140138
logger.error(e.getMessage());
@@ -146,13 +144,13 @@ public void add(Buffer buffer)
146144
@Override
147145
public void finish()
148146
{
149-
closeCurrentWithUpload();
147+
closeCurrentFile();
150148
}
151149

152150
@Override
153151
public void close()
154152
{
155-
closeCurrentWithUpload();
153+
closeCurrentFile();
156154
manager.close();
157155
}
158156

@@ -167,57 +165,24 @@ public TaskReport commit()
167165
return null;
168166
}
169167

170-
private void closeCurrentWithUpload()
168+
169+
private void closeCurrentFile()
171170
{
172-
try {
173-
closeCurrentWorkingFileContent();
174-
uploadCurrentWorkingFileToSftp();
175-
closeCurrentWorkingFile();
171+
if (currentFile == null) {
172+
return;
176173
}
177-
catch (URISyntaxException e) {
178-
logger.error(e.getMessage());
179-
Throwables.propagate(e);
174+
175+
try {
176+
currentFileOutputStream.close();
177+
currentFile.getContent().close();
178+
currentFile.close();
180179
}
181180
catch (IOException e) {
182181
logger.error(e.getMessage());
183182
Throwables.propagate(e);
184183
}
185184
fileIndex++;
186-
currentWorkingFile = null;
187-
}
188-
189-
private void closeCurrentWorkingFileContent()
190-
throws IOException
191-
{
192-
if (currentWorkingFile == null) {
193-
return;
194-
}
195-
currentWorkingFileOutputStream.close();
196-
currentWorkingFile.getContent().close();
197-
}
198-
199-
private void uploadCurrentWorkingFileToSftp()
200-
throws FileSystemException, URISyntaxException
201-
{
202-
if (currentWorkingFile == null) {
203-
return;
204-
}
205-
206-
try (FileObject remoteSftpFile = newSftpFile(getSftpFileUri(getOutputFilePath()))) {
207-
remoteSftpFile.copyFrom(currentWorkingFile, Selectors.SELECT_SELF);
208-
logger.info("Upload: {}", remoteSftpFile.getPublicURIString());
209-
}
210-
}
211-
212-
private void closeCurrentWorkingFile()
213-
throws FileSystemException
214-
{
215-
if (currentWorkingFile == null) {
216-
return;
217-
}
218-
219-
currentWorkingFile.close();
220-
currentWorkingFile.delete();
185+
currentFile = null;
221186
}
222187

223188
private URI getSftpFileUri(String remoteFilePath)
@@ -226,12 +191,6 @@ private URI getSftpFileUri(String remoteFilePath)
226191
return new URI("sftp", userInfo, host, port, remoteFilePath, null, null);
227192
}
228193

229-
private URI getWorkingFileUri(String remoteFilePath)
230-
throws URISyntaxException
231-
{
232-
return new URI(workingFileScheme, null, remoteFilePath, null);
233-
}
234-
235194
private String getOutputFilePath()
236195
{
237196
return pathPrefix + String.format(sequenceFormat, taskIndex, fileIndex) + fileNameExtension;
@@ -242,12 +201,4 @@ private FileObject newSftpFile(URI sftpUri)
242201
{
243202
return manager.resolveFile(sftpUri.toString(), fsOptions);
244203
}
245-
246-
private FileObject newWorkingFile(URI workingFileUri)
247-
throws FileSystemException
248-
{
249-
FileObject workingFile = manager.resolveFile(workingFileUri);
250-
workingFile.createFile();
251-
return workingFile;
252-
}
253204
}

src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,6 @@ public interface PluginTask
5353
@ConfigDefault("600") // 10 munites
5454
public int getSftpConnectionTimeout();
5555

56-
@Config("working_file_schema")
57-
@ConfigDefault("ram")
58-
public String getWorkingFileScheme();
59-
6056
@Config("path_prefix")
6157
public String getPathPrefix();
6258

0 commit comments

Comments
 (0)